- import com.google.common.base.Splitter;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import scala.Tuple2;
-
- import java.util.Arrays;
- import java.util.Iterator;
-
- public class WordCount {
- public static void main(String[] args) {
- SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
- JavaRDD<String> lines = sc.textFile("file:/Users/zhudechao/gitee/bigdata/xzdream_spark/input/a.txt");
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-
- @Override
- public Iterator<String> call(String line) throws Exception {
- return Arrays.asList(line.split(" ")).iterator();
- }
- });
-
- JavaPairRDD<String,Integer> pairRDD = words.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String word) throws Exception {
- return new Tuple2<String, Integer>(word,1);
- }
- });
-
- JavaPairRDD<String,Integer> wordCounts = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
-
- wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
- @Override
- public void call(Tuple2<String, Integer> wordcount) throws Exception {
- System.out.println(wordcount._1 + ":"+wordcount._2);
- }
- });
- }
- }
复制代码- package com.huawei.mapreduce.wordcount;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class WordCountApp {
- public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String[] splited = line.split("\t");
- for (String word : splited) {
- Text k2 = new Text(word);
- LongWritable v2 = new LongWritable(1);
- context.write(k2, v2);
- }
- }
- }
- public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
- @Override
- protected void reduce(Text k2, Iterable<LongWritable> v2s,
- Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- long count = 0L;
- for (LongWritable times : v2s) {
- count += times.get();
- }
- LongWritable v3 = new LongWritable(count);
- context.write(k2, v3);
- }
- }
- public static void main(String[] args) throws Exception{
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
- //必须指定
- job.setJarByClass(WordCountApp.class);
- //指定本业务job要使用的Mapper业务类
- job.setMapperClass(MyMapper.class);
- //指定mapper输出数据的<k2,v2>的类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- //指定本业务job要使用的Reducer业务类
- job.setReducerClass(MyReducer.class);
- //指定reducer输出数据的<k3,v3>的类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- //输入数据来自哪里
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- //输出数据写到哪里
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- //true表示将运行进度等信息及时输出给用户
- boolean res = job.waitForCompletion(true);
- System.exit(res?0:1);
- }
- }
复制代码 - tar -zxvf jdk-8u341-linux-x64.tar.gz
- wget https://hcip-materials.obs.cn-north-4.myhuaweicloud.com/jdk-8u341-linux-x64.tar.gz
- scp ~/eclipse-workspace/HDFSAPI/target/HDFSAPI-jar-with-dependencies.jar root@xxx.xxx.xxx.xxx:/root
- ssh root@xxx.xxx.xxx.xxx
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile2
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file10.txt
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file11.txt
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.DeleteFile /user/test/hdfs/file10.txt
- yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFile
- yarn jar MRAPI-jar-with-dependencies.jar com.huawei.mapreduce.wordcount.WordCountApp /user/user1/MR_data /user/user1/MR_out
- hdfs dfs -mkdir /user/user1
- hdfs dfs -put MR_data /user/user1/
- hdfs dfs -ls /user/user1/MR_out/
- hdfs dfs -cat /user/user1/MR_out/part-r-00000
- hdfs dfs -mkdir -p /user/user1/MR/input
- hdfs dfs -mkdir -p /user/user1/MR/output
- hdfs dfs -put mrsort.txt /user/user1/MR/input
- hdfs dfs -ls /user/user1/MR/output
- hdfs dfs -cat /user/user1/MR/output/part-r-00000
- hdfs dfs -cat /user/user1/MR/output/part-r-00001
- hdfs dfs -cat /user/user1/MR/output/part-r-00002
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |