java wordcount

打印 上一主题 下一主题

主题 908|帖子 908|积分 2724

  1. import com.google.common.base.Splitter;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.FlatMapFunction;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import org.apache.spark.api.java.function.VoidFunction;
  10. import scala.Tuple2;
  11. import java.util.Arrays;
  12. import java.util.Iterator;
  13. public class WordCount {
  14.     public static void main(String[] args) {
  15.         SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
  16.         JavaSparkContext sc = new JavaSparkContext(sparkConf);
  17.         JavaRDD<String> lines = sc.textFile("file:/Users/zhudechao/gitee/bigdata/xzdream_spark/input/a.txt");
  18.         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  19.             @Override
  20.             public Iterator<String> call(String line) throws Exception {
  21.                 return Arrays.asList(line.split(" ")).iterator();
  22.             }
  23.         });
  24.         JavaPairRDD<String,Integer> pairRDD = words.mapToPair(new PairFunction<String, String, Integer>() {
  25.             @Override
  26.             public Tuple2<String, Integer> call(String word) throws Exception {
  27.                 return new Tuple2<String, Integer>(word,1);
  28.             }
  29.         });
  30.         JavaPairRDD<String,Integer> wordCounts = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  31.             @Override
  32.             public Integer call(Integer v1, Integer v2) throws Exception {
  33.                 return v1 + v2;
  34.             }
  35.         });
  36.         wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
  37.             @Override
  38.             public void call(Tuple2<String, Integer> wordcount) throws Exception {
  39.                 System.out.println(wordcount._1 + ":"+wordcount._2);
  40.             }
  41.         });
  42.     }
  43. }
复制代码
  1. package com.huawei.mapreduce.wordcount;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class WordCountApp {
  13.     public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  14.         @Override
  15.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  16.             String line = value.toString();
  17.             String[] splited = line.split("\t");
  18.             for (String word : splited) {
  19.                 Text k2 = new Text(word);
  20.                 LongWritable v2 = new LongWritable(1);
  21.                 context.write(k2, v2);
  22.             }
  23.         }
  24.     }
  25.     public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
  26.         @Override
  27.         protected void reduce(Text k2, Iterable<LongWritable> v2s,
  28.                               Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  29.             long count = 0L;
  30.             for (LongWritable times : v2s) {
  31.                 count += times.get();
  32.             }
  33.             LongWritable v3 = new LongWritable(count);
  34.             context.write(k2, v3);
  35.         }
  36.     }
  37.     public static void main(String[] args) throws Exception{
  38.         Configuration conf = new Configuration();
  39.         Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
  40.         //必须指定
  41.         job.setJarByClass(WordCountApp.class);
  42.         //指定本业务job要使用的Mapper业务类
  43.         job.setMapperClass(MyMapper.class);
  44.         //指定mapper输出数据的<k2,v2>的类型
  45.         job.setMapOutputKeyClass(Text.class);
  46.         job.setMapOutputValueClass(LongWritable.class);
  47.         //指定本业务job要使用的Reducer业务类
  48.         job.setReducerClass(MyReducer.class);
  49.         //指定reducer输出数据的<k3,v3>的类型
  50.         job.setOutputKeyClass(Text.class);
  51.         job.setOutputValueClass(LongWritable.class);
  52.         //输入数据来自哪里
  53.         FileInputFormat.setInputPaths(job, new Path(args[0]));
  54.         //输出数据写到哪里
  55.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  56.         //true表示将运行进度等信息及时输出给用户
  57.         boolean res = job.waitForCompletion(true);
  58.         System.exit(res?0:1);
  59.     }
  60. }
复制代码
 
  1. tar -zxvf jdk-8u341-linux-x64.tar.gz
  2. wget https://hcip-materials.obs.cn-north-4.myhuaweicloud.com/jdk-8u341-linux-x64.tar.gz
  3. scp ~/eclipse-workspace/HDFSAPI/target/HDFSAPI-jar-with-dependencies.jar root@xxx.xxx.xxx.xxx:/root
  4. ssh root@xxx.xxx.xxx.xxx
  5. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
  6. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1
  7. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
  8. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1
  9. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile2
  10. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file10.txt
  11. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file11.txt
  12. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.DeleteFile /user/test/hdfs/file10.txt
  13. yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
  14. yarn jar MRAPI-jar-with-dependencies.jar com.huawei.mapreduce.wordcount.WordCountApp /user/user1/MR_data /user/user1/MR_out
  15. hdfs dfs -mkdir /user/user1
  16. hdfs dfs -put MR_data /user/user1/
  17. hdfs dfs -ls /user/user1/MR_out/
  18. hdfs dfs -cat /user/user1/MR_out/part-r-00000
  19. hdfs dfs -mkdir -p /user/user1/MR/input
  20. hdfs dfs -mkdir -p /user/user1/MR/output
  21. hdfs dfs -put mrsort.txt /user/user1/MR/input
  22. hdfs dfs -ls /user/user1/MR/output
  23. hdfs dfs -cat /user/user1/MR/output/part-r-00000
  24. hdfs dfs -cat /user/user1/MR/output/part-r-00001
  25. hdfs dfs -cat /user/user1/MR/output/part-r-00002
复制代码
 
  

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

知者何南

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表