基于Hadoop的MapReduce

打印 上一主题 下一主题

主题 850|帖子 850|积分 2550

一、引言


身处大数据时代,数据洪流汹涌来袭,企业运营与决策对数据的倚重达到前所未有的程度。数据量呈爆炸式增长,类型繁杂多样,处置惩罚效率要求严苛,这些挑衅仿若巨石,横亘在企业前行门路上。与此同时,机遇隐匿其中,善用大数据者,仿若手握利刃,可精准剖析市场趋势、洞悉客户需求,进而抢占发展先机。Hadoop 与 MapReduce 技能应运而生,好似破局关键,为高效处置惩罚海量数据开发极新路径,而匀称数盘算在数据解读中占据核心职位,于诸多领域彰显关键代价。本文将围绕其原理、实操睁开深度剖析,呈上一份详确指南。
二、大数据时代挑衅

(一)数据量增长之困


企业业务拓展与数字化转型并驾齐驱,海量数据如潮流般每日奔涌而至,结构化、半结构化、非结构化数据交织混杂。传统数据处置惩罚手段在这汹涌数据潮前尽显疲态,效率大打扣头,成本却节节攀升。数据量攀升还牵出数据质量隐忧,复杂数据致使传统方法难以精准处置惩罚,误差、遗漏频现。业务持续推进,要求企业实时、精准捕捉数据动态,一套高效数据处置惩罚与管理体系亟待搭建,以此稳固数据 “根基”,确保决策有据可依。
(二)数据多样性困难


大数据舞台上,数据类型异彩纷呈,文本、图像、视频等非结构化数据与关系型、时间序列等结构化数据同场 “竞技”。各异数据类型需适配差别处置惩罚技能与方法,且彼此间关联错综复杂,纵横交织。企业若想深挖数据富矿,解锁潜藏代价,多元数据处置惩罚、分析本领不可或缺,单一技能手段难以驾驭这复杂局面。
(三)数据处置惩罚效率瓶颈


市场风云变幻,机遇稍纵即逝,企业决策与业务优化亟需快速数据支持。怎样海量且复杂的数据,让处置惩罚、分析耗时漫长,决策被迫滞后,商机频频错失。如今,高效数据处置惩罚、分析技能成为企业 “刚需”,以求数据处置惩罚兼具速率与精度,契合实时性、准确性严苛标准。
三、Hadoop 与 MapReduce 简介

(一)Hadoop 核心理念


Hadoop 仿若一位智慧 “指挥官”,将庞杂大数据拆解为诸多小块,巧妙分发至多个节点并行处置惩罚,让数据处置惩罚效率呈指数级跃升。其秉持 “分而治之” 策略,把数据存储与盘算任务分摊至集群节点,化解单机处置惩罚压力,实现大规模数据高效运算。
(二)MapReduce 编程模子


MapReduce 作为 Hadoop “心脏”,运作逻辑精妙。Map 阶段是数据 “拆解工”,把输入数据剁碎成小片断,逐块实行 Map 操纵,原始数据经此摇身一变,化作中心结果;Reduce 阶段则是 “组装匠”,将分散中心结果收拢归并,雕琢出最终成果。这般设计大幅简化分布式盘算复杂度,开发职员得以超脱底层繁琐细节,心无旁骛聚焦数据处置惩罚逻辑。不过,它也并非完美无缺,大量数据网络传输时,仿若遭遇交通堵塞,时间无端损耗,性能瓶颈凸显。为此,Hadoop 引入数据本地化、压缩、传输优化等 “疏通策略”,力保数据流通处置惩罚,性能稳步上扬。
四、匀称数盘算重要性及应用场景

(一)数据统计与分析基石


匀称数宛如数据海洋中的 “灯塔”,直观勾勒数据总体轮廓与趋势,助分析师敏捷定位数据特性。借由它,可敏锐洞察数据异常值、波动轨迹,深挖数据潜藏代价;也是数据比力、评估 “标尺”,权衡差别数据集优劣短长,为决策输送关键参考。
(二)分布式体系 “黏合剂”


分布式体系内,数据源与处置惩罚节点四散分布,数据同步举步维艰。匀称数盘算此时挺身而出,整合各节点数据,熨平差别、剔除异常,好似强力 “黏合剂”,促成数据归一,夯实数据融合底子,让体系数据精准可靠。
(三)呆板学习与数据发掘 “助推器”


在呆板学习与数据发掘赛道上,匀称数同样表现杰出。盘算相似度、间隔度量时,常被奉为特性值、基准值,为数据分类、聚类精准 “导航”;异常检测、数据洗濯场景中,化身 “滤网”,精准筛除异常数据,提纯数据质量,提升模子预测精度与泛化本领。
五、MapReduce 盘算匀称数原理

(一)MapReduce 编程模子详析

1. Map 阶段:数据拆分与转换先锋


Map 阶段是数据处置惩罚 “排头兵”,原始数据在此被利索拆解成浩繁小任务,并行奔赴各 Mapper 节点 “开工”。Mapper 依循编写好的 Map 函数指令,将数据块处置惩罚成键值对情势输出,这些键值对如同接力棒,无缝传递至 Reduce 阶段。此阶段不仅是数据物理拆分,更是逻辑重塑,为后续精细盘算筑牢根基。
2. Reduce 阶段:数据汇总与盘算中枢


Reduce 阶段身负数据汇总 “重任”,接收 Map 阶段产出的中心键值对,依键聚合对应值,精准求和、计数,算出匀称值。Reduce 函数经心编排这一系列操纵,输出精准结果,或存储备用,或移交后续流程深加工。MapReduce 模子凭借简洁设计与强盛功能,兼具易用、可扩展特质,在大数据战场大显身手,广受青睐。
(二)匀称数盘算思绪拆解

1. Map 阶段:数据预处置惩罚 “流水线”


Map 伊始,数据切块分发给 Mapper,后者依规则将数据项转为键值对,键为数据独特标识,值是数据真身。过程涵盖拆分、转换、洗濯、过滤多道工序:拆分细化数据便于操纵;转换赋予数据相宜盘算 “外衣”;洗濯如拂尘,掸去噪声、异常值;过滤似滤网,筛除无用数据,输出 “清洁” 数据,助力后续高效运算。
2. Reduce 阶段:匀称值 “锻造” 工坊


此阶段同键数据齐聚一堂,开启求和、计数之旅。求和累加之余,精准统计数据条数,二者相除,匀称值应运而生。全程高度并行、机动扩展,集群盘算资源得以充实调动,差别规模、复杂度数据集皆能从容应对,为数据分析呈上坚固数据支持。
六、Hadoop 环境搭建与配置

(一)下载安装:夯实底子


开启 Hadoop 探索之旅,首站便是奔赴官网,揽获最新稳固版本 “上车票”。下载完毕,依循官方指南步步拆解安装包、精细配置文件。这一过程容不得丝毫马虎,每个步骤皆关乎体系后续稳固运行,是构建坚固 Hadoop 环境的关键 “基石”。
(二)环境变量配置:打通 “经脉”


环境变量配置是 Hadoop 命令畅行体系的 “通行证”。将 Hadoop bin 目录精准嵌入 PATH 变量,体系方能精准定位可实行文件;HADOOP_HOME 等相关变量依体系与版本特性妥善调适,确保环境变量 “经脉” 通畅,Hadoop 各类命令一呼即应。
(三)格式化 NameNode:激活集群 “心脏”


NameNode 作为 Hadoop 分布式文件体系(HDFS)核心 “心脏”,掌控文件体系元数据存储与管理大权。格式化 NameNode 前,审慎核查 HDFS 配置文件,地址、副本数等参数准确无误后,方可启动。此操纵宛如激活集群 “心脏”,初始化元数据存储目录,NameNode 自此开启监听,统筹文件、数据块调度。

除此之外,网络环境调适保障节点通信无阻;安全策略设定筑牢数据隐私 “防火墙”;与其他体系集成考量实现多元协同,诸多细节依应用场景量文体衣,雕琢出契合需求的 Hadoop 环境。
七、编写代码实现匀称数盘算

(一)Mapper 类:数据读取与转换 “尖兵”

1. 数据读取:定制输入 “流水线”


InputSplitter 化身数据切割 “利刃”,依数据特质定制分割策略,文本按行、段落拆解,二进制依固定尺寸或特殊标识裁切;InputFormat 协同 RecordReader 接口,将分割片断重塑为 Mapper 适配的键值对情势,搭建顺畅数据输入通道,确保数据精准入 “膛”。
2. 数据转换:格式重塑 “魔法”


原始数据常隐匿于非结构化 “外壳” 下,Mapper 发挥数据类型转换 “魔法”,字符串、字符数组等摇身化作整数、浮点数,贴合后续盘算需求。过程严密监控异常,杜绝因格式毛病引发盘算 “毛病”,并实时累加求和、记载数据个数,成果封装成中心键值对,精准 “投递” 至 Reducer。
3. 发射中心键值对:数据交代 “信使”


经处置惩罚的数据借由 context.write () 方法飞赴 Reducer,键值对结构经心雕琢,标识与处置惩罚结果精准匹配,保障传输高效无误,好似尽职 “信使”,传递关键信息。
(二)Reducer 类:数据汇总与匀称值输出 “巨匠”

1. 接收中心键值对:数据 “收纳”


Reducer 继续基类、重写 reduce 方法,搭建接收中心键值对 “港湾”,迭代器高效遍历,逐一处置惩罚键值对,不错过任何关键数据。
2. 数据汇总:累加整合 “工坊”


哈希表等数据结构粉墨登场,依键聚合、累加对应值,计数同步更新,有条不紊梳理数据,为匀称值盘算备齐 “原料”。
3. 匀称值盘算与输出:成果 “雕琢”


累加值与计数相除,匀称值跃然眼前,盘算全程紧盯分母零值,巧妙规避除零 “陷阱”。最闭幕果封装入键值对,借上下文对象精准输出至指定路径,格式化输出 “梳妆打扮”,方便后续剖析应用。
(三)Driver 类:任务管控 “指挥官”

1. 设置任务参数:精准 “结构”


Driver 类操持任务参数设置大权,输入、输出路径明确数据 “来龙去脉”;Mapper、Reducer 数量依数据规模、任务需求精细调配;作业名称、内存大小、优先级等参数全盘斟酌,平衡资源使用与实行效率,为任务稳健实行铺就坦途。
2. 提交任务:启动 “引擎”


参数停当,JobClient 携任务请求奔赴 Hadoop 集群,与 ResourceManager 深度 “洽谈”,申请资源、点燃任务 “引擎”。提交前细致核验输入输出路径正当性,确保依靠库、配置文件就位,杜绝实行隐患。
3. 监控任务进度:全程 “护航”


任务动身,Driver 类借 JobClient 实时 “瞭望”,Map、Reduce 阶段进度尽收眼底,运行状态、速率、错误信息全方位把控,遇异常敏捷 “拉响警报”,回调函数实时响应,化解危急。
4. 获取输出结果:收获 “果实”


任务功成,Driver 类奔赴输出路径 “收割” 成果,HDFS 命令行工具或 API 助力剖析、转换格式,核验数据完整准确,为后续决策呈上 “定心丸”。
八、案例实操:某中学月考成绩匀称分析


以某中学月考成绩统计为实战 “演武场”,深度践行上述理论。

(一)Map 阶段


java
  1. package com.hadoop.mapreduce.qqqqqq;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. public class qqqMap extends Mapper<LongWritable, Text, Text, IntWritable>{
  8.     // map 方法是 Mapper 类的核心方法,它处理每一条输入记录
  9.     private Text outkey = new Text();
  10.     private IntWritable outvalue = new IntWritable();
  11.     @Override
  12.     protected void map(LongWritable key, Text value, Context context)
  13.             throws IOException, InterruptedException {
  14.         // 将输入的一行文本转换为字符串
  15.         String line = value.toString();
  16.         // 按逗号分割这一行,假设每行包含多个以逗号分隔的数字
  17.         String[] numbers = line.split(",");
  18.         // 遍历分割后的数组,处理每一个数字
  19.         String qq  =  numbers[1];
  20.         int qqq = Integer.parseInt(numbers[2]);
  21.         outkey.set(qq);
  22.         outvalue.set(qqq);
  23.         context.write(outkey,outvalue);
  24.     }
  25. }
复制代码

在此 Map 代码片断中,精准读取成绩表每行数据,逗号为界拆解,科目名称设为键,成绩数值封装为值,高效输出键值对,为 Reduce 阶段呈上预处置惩罚数据。
(二)Reduce 阶段


java
  1. package com.hadoop.mapreduce.qqqqqq;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class qqqReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
  7.     // reduce 方法是 Reducer 类的核心方法,它处理来自 Mapper 的相同键的所有值
  8.     @Override
  9.     protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  10.         // 初始化总和和计数器
  11.         int sum = 0;
  12.         int count = 0;
  13.         // 遍历所有值,累加总和并增加计数器
  14.         for (IntWritable val : values) {
  15.             sum += val.get(); // 总和加上当前键的值
  16.             count++; // 计数器加1
  17.         }
  18.         // 计算平均值
  19.         double average =  sum / count;
  20.         // 输出最终结果,键是字符串 "Average",值是计算出的平均值
  21.         context.write(key, new IntWritable((int) average));
  22.     }
  23. }
复制代码

Reduce 代码稳健接收 Map 阶段成果,遍历同科目成绩值,累加求和、精准计数,算出匀称成绩,封装输出,圆满告竣各科目匀称成绩统计使命。
(三)Driver 阶段


java
  1. package com.hadoop.mapreduce.qqqqqq;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. public class qqqDriver {
  10.     public static void main(String[] args) throws Exception {
  11.         // 创建配置对象
  12.         Configuration conf = new Configuration();
  13.         // 创建 Job 实例
  14.         Job job = Job.getInstance(conf);
  15.         // 设置 Job 的主类,即包含 main 方法的类
  16.         job.setJarByClass(qqqDriver.class);
  17.         // 设置 Mapper 类
  18.         job.setMapperClass(qqqMap.class);
  19.         // 设置 Reducer 类
  20.         job.setReducerClass(qqqReduce.class);
  21.         // 设置输出的键类型
  22.         job.setOutputKeyClass(Text.class);
  23.         // 设置输出的值类型
  24.         job.setOutputValueClass(IntWritable.class);
  25.         // 设置输入文件路径
  26.         FileInputFormat.addInputPath(job, new Path("E:\\dhy\\java\\Hadoop\\src\\main\\java\\com\\hadoop\\mapreduce\\qqqqqq\\import"));
  27.         // 设置输出文件路径
  28.         FileOutputFormat.setOutputPath(job, new Path("E:\\dhy\\java\\Hadoop\\src\\main\\java\\com\\hadoop\\mapreduce\\qqqqqq\\output\\qqqq33"));
  29.         //7.提交jop
  30.         boolean result = job.waitForCompletion(true);
  31.     }
  32. }
复制代码
最闭幕果:

Driver 代码作为 “指挥官”,全方位统筹任务。配置初始化、作业实例搭建、类与路径参数精准设定,一气呵成提交任务,耐心等待实行完毕,最终从指定路径收获各科目匀称成绩 “硕果”,为教学评估、学生辅导输送关键数据洞察。
Mapper 类代码(WordCountMapper.java)

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. // 继承Mapper类,指定输入的键类型为LongWritable(偏移量),输入的值类型为Text(一行文本),输出的键类型为Text(单词),输出的值类型为IntWritable(单词出现次数)
  7. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  8.     private final static IntWritable one = new IntWritable(1);
  9.     private Text word = new Text();
  10.     @Override
  11.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  12.         // 将输入的一行文本转换为字符串
  13.         String line = value.toString();
  14.         // 按空格分割字符串,得到单词数组
  15.         String[] words = line.split(" ");
  16.         // 遍历单词数组,对每个单词进行处理
  17.         for (String w : words) {
  18.             word.set(w);
  19.             // 输出键值对,键为单词,值为1(表示出现了1次)
  20.             context.write(word, one);
  21.         }
  22.     }
  23. }
复制代码

在这个 Mapper 类中,核心的map方法负责将输入的文本行举行处置惩罚,把每行文本拆分成一个个单词,并输出以单词为键、出现次数 1 为值的键值对。
2. Reducer 类代码(WordCountReducer.java)

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. // 继承Reducer类,指定输入的键类型为Text(单词),输入的值类型为IntWritable(单词出现的次数列表),输出的键类型为Text(单词),输出的值类型为IntWritable(单词最终出现次数)
  6. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  7.     @Override
  8.     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  9.         int sum = 0;
  10.         // 遍历从Mapper传递过来的同一个单词对应的所有出现次数的值,进行累加求和
  11.         for (IntWritable val : values) {
  12.             sum += val.get();
  13.         }
  14.         // 将累加后的单词出现次数封装成IntWritable类型
  15.         IntWritable result = new IntWritable(sum);
  16.         // 输出最终结果,键为单词,值为单词的总出现次数
  17.         context.write(key, result);
  18.     }
  19. }
复制代码
Reducer 类中的reduce方法会接收 Mapper 输出的相同键(单词)的所有值(多个 1),然后将这些值举行累加,得到该单词在整个文本文件中的总出现次数,并输出最终的键值对结果。
3. Driver 类代码(WordCountDriver.java)

java
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class WordCountDriver {
  10.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  11.         // 创建配置对象
  12.         Configuration conf = new Configuration();
  13.         // 创建Job实例
  14.         Job job = Job.getInstance(conf);
  15.         // 设置Job的主类,即包含main方法的类
  16.         job.setJarByClass(WordCountDriver.class);
  17.         // 设置Mapper类
  18.         job.setMapperClass(WordCountMapper.class);
  19.         // 设置Reducer类
  20.         job.setReducerClass(WordCountReducer.class);
  21.         // 设置输出的键类型
  22.         job.setOutputKeyClass(Text.class);
  23.         // 设置输出的值类型
  24.         job.setOutputValueClass(IntWritable.class);
  25.         // 设置输入文件路径,可以从命令行参数传入,这里假设传入的第一个参数是输入路径
  26.         FileInputFormat.addInputPath(job, new Path(args[0]));
  27.         // 设置输出文件路径,可以从命令行参数传入,这里假设传入的第二个参数是输出路径
  28.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  29.         // 提交任务,并等待任务完成,返回任务执行的结果(成功为true,失败为false)
  30.         boolean result = job.waitForCompletion(true);
  31.         System.exit(result? 0 : 1);
  32.     }
  33. }
复制代码
Driver 类作为整个 MapReduce 任务的入口,负责设置任务相关的各种参数,包括配置对象、Mapper 和 Reducer 类、输入输出的键值类型、输入输出路径等,最后提交任务到 Hadoop 集群实行,并根据任务实行结果退出程序。
你可以按照以下步骤来运行这个案例代码:

  • 确保已经安装并配置好 Hadoop 环境。
  • 将上述三个 Java 类的代码编译打包成一个 JAR 文件(比方wordcount.jar),可以使用雷同如下命令(假设你已经安装了 JDK 而且配置好了相关环境变量):
bash
  1. javac -classpath `hadoop classpath` WordCountMapper.java WordCountReducer.java WordCountDriver.java
  2. jar -cvf wordcount.jar *.class
复制代码

  • 将预备好的文本文件(比方input.txt)上传到 Hadoop 分布式文件体系(HDFS)的某个目录下(假设为/input目录),可以使用命令hdfs dfs -put input.txt /input。
  • 运行 MapReduce 任务,实行命令如下(假设输出路径为/output目录):
bash
  1. hadoop jar wordcount.jar WordCountDriver /input /output
复制代码
这样就可以通过这个简朴的 MapReduce 程序统计出文本文件中各个单词的出现次数了,结果会保存在 HDFS 的/output目录下。
以下是一个使用 Hadoop 和 MapReduce 举行数据排序的案例代码:


  • 创建一个 Java 项目,并在项目中创建以下三个文件:

    • SortMapper.java:Mapper 类,用于从输入数据中提取键值对,并将键作为排序的依据。
    • SortReducer.java:Reducer 类,用于接收 Mapper 输出的键值对,并举行排序和输出。
    • SortDriver.java:Driver 类,用于设置任务参数并提交任务到 Hadoop 集群。在 SortMapper.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. public class SortMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  6.     @Override
  7.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  8.         // 将输入的一行文本转换为字符串
  9.         String line = value.toString();
  10.         // 提取每行文本的第一个字段作为键
  11.         String keyField = line.split(",")[0];
  12.         // 将键转换为 LongWritable 类型
  13.         LongWritable keyValue = new LongWritable(Long.parseLong(keyField));
  14.         // 输出键值对
  15.         context.write(new Text(keyValue.toString()), value);
  16.     }
  17. }
复制代码

在 SortReducer.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class SortReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  6.     @Override
  7.     protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  8.         // 对 values 进行排序
  9.         LongWritable[] sortedValues = new LongWritable[values.size()];
  10.         int i = 0;
  11.         for (LongWritable value : values) {
  12.             sortedValues[i++] = value;
  13.         }
  14.         Arrays.sort(sortedValues);
  15.         // 输出排序后的键值对
  16.         for (LongWritable value : sortedValues) {
  17.             context.write(key, value);
  18.         }
  19.     }
  20. }
复制代码

在 SortDriver.java 文件中,编写以下代码:

java
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class SortDriver {
  9.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  10.         // 创建配置对象
  11.         Configuration conf = new Configuration();
  12.         // 创建 Job 实例
  13.         Job job = Job.getInstance(conf);
  14.         // 设置 Job 的主类,即包含 main 方法的类
  15.         job.setJarByClass(SortDriver.class);
  16.         // 设置 Mapper 类
  17.         job.setMapperClass(SortMapper.class);
  18.         // 设置 Reducer 类
  19.         job.setReducerClass(SortReducer.class);
  20.         // 设置输出的键类型
  21.         job.setOutputKeyClass(Text.class);
  22.         // 设置输出的值类型
  23.         job.setOutputValueClass(LongWritable.class);
  24.         // 设置输入文件路径
  25.         FileInputFormat.addInputPath(job, new Path(args[0]));
  26.         // 设置输出文件路径
  27.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  28.         // 提交任务,并等待任务完成
  29.         boolean result = job.waitForCompletion(true);
  30.         System.exit(result? 0 : 1);
  31.     }
  32. }
复制代码


  • 编译并打包代码,天生 JAR 文件。
  • 将必要排序的数据文件上传到 Hadoop 分布式文件体系(HDFS)的某个目录下。
  • 在终端或命令行中实行以下命令,运行 MapReduce 任务:
bash
  1. hadoop jar sort.jar SortDriver /input /output
复制代码

其中,/input 是输入数据文件所在的路径,/output 是排序结果输出的路径。


  • 任务实行完成后,可以在 HDFS 的 /output 目录下查看排序后的结果文件。
以下是一个使用Hadoop 和 MapReduce 案例代码,用于盘算文本文件中每个单词的出现频率,并按照频率降序排序:



  • 创建一个 Java 项目,并在项目中创建以下四个文件:

    • WordCountMapper.java:Mapper 类,用于将输入的文本行拆分成单词,并输出键值对。
    • WordCountReducer.java:Reducer 类,用于接收 mapper 输出的键值对,盘算每个单词的出现次数,并按照频率降序排序。
    • Partitioner.java:Partitioner 类,用于根据单词的哈希值举行分区,确保相同单词被分配到同一个 Reducer。
    • Driver.java:Driver 类,用于设置任务参数并提交任务到 Hadoop 集群。在 WordCountMapper.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  7.     private final static IntWritable one = new IntWritable(1);
  8.     private Text word = new Text();
  9.     @Override
  10.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  11.         // 将输入的一行文本转换为字符串
  12.         String line = value.toString();
  13.         // 按空格分割字符串,得到单词数组
  14.         String[] words = line.split(" ");
  15.         // 遍历单词数组,对每个单词进行处理
  16.         for (String w : words) {
  17.             word.set(w);
  18.             // 输出键值对,键为单词,值为 1
  19.             context.write(word, one);
  20.         }
  21.     }
  22. }
复制代码

在 WordCountReducer.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.TreeMap;
  8. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  9.     private Map<String, Integer> wordCount = new HashMap<>();
  10.     @Override
  11.     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  12.         int count = 0;
  13.         // 遍历从 Mapper 传递过来的同一个单词对应的所有出现次数的值,进行累加求和
  14.         for (IntWritable val : values) {
  15.             count += val.get();
  16.         }
  17.         // 将单词及其出现次数存储在 wordCount 中
  18.         wordCount.put(key.toString(), count);
  19.     }
  20.     @Override
  21.     protected void cleanup(Context context) throws IOException, InterruptedException {
  22.         // 创建一个 TreeMap,按照单词出现次数降序排序
  23.         TreeMap<String, Integer> sortedWordCount = new TreeMap<>((a, b) -> wordCount.get(b) - wordCount.get(a));
  24.         sortedWordCount.putAll(wordCount);
  25.         // 遍历排序后的 TreeMap,输出单词及其出现次数
  26.         for (Map.Entry<String, Integer> entry : sortedWordCount.entrySet()) {
  27.             context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
  28.         }
  29.     }
  30. }
复制代码

在 Partitioner.java 文件中,编写以下代码:

java
  1. import org.apache.hadoop.io.Text;
  2. import org.apache.hadoop.mapreduce.Partitioner;
  3. public class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<Text, IntWritable> {
  4.     @Override
  5.     public int getPartition(Text key, IntWritable value, int numPartitions) {
  6.         // 根据单词的哈希值对分区数量取模,确保相同单词被分配到同一个Reducer
  7.         return Math.abs(key.hashCode() % numPartitions);
  8.     }
  9. }
复制代码

在 Driver.java 文件中,编写以下代码:

java
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class Driver {
  9.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  10.         // 创建配置对象
  11.         Configuration conf = new Configuration();
  12.         // 创建 Job 实例
  13.         Job job = Job.getInstance(conf);
  14.         // 设置 Job 的主类,即包含 main 方法的类
  15.         job.setJarByClass(Driver.class);
  16.         // 设置 Mapper 类
  17.         job.setMapperClass(WordCountMapper.class);
  18.         // 设置 Reducer 类
  19.         job.setReducerClass(WordCountReducer.class);
  20.         // 设置 Partitioner 类
  21.         job.setPartitionerClass(Partitioner.class);
  22.         // 设置输出的键类型
  23.         job.setOutputKeyClass(Text.class);
  24.         // 设置输出的值类型
  25.         job.setOutputValueClass(IntWritable.class);
  26.         // 设置输入文件路径
  27.         FileInputFormat.addInputPath(job, new Path(args[0]));
  28.         // 设置输出文件路径
  29.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  30.         // 提交任务,并等待任务完成
  31.         boolean result = job.waitForCompletion(true);
  32.         System.exit(result? 0 : 1);
  33.     }
  34. }
复制代码

  • 编译并打包代码,天生 JAR 文件。
  • 将必要处置惩罚的文本文件上传到 Hadoop 分布式文件体系(HDFS)的某个目录下。
  • 在终端或命令行中实行以下命令,运行 MapReduce 任务:
bash
  1. hadoop jar wordcount.jar Driver /input /output
复制代码
其中,/input 是输入数据文件所在的路径,/output 是排序结果输出的路径。
任务实行完成后,可以在 HDFS 的 /output 目录下查看排序后的结果文件。

九、以下是一个使用 MapReduce 举行数据去重的案例代码:



  • 创建一个 Java 项目,并在项目中创建以下三个文件:

    • DataDeduplicationMapper.java:Mapper 类,用于将输入的数据转换为键值对。
    • DataDeduplicationReducer.java:Reducer 类,用于接收 mapper 输出的键值对,举行去重操纵。
    • DataDeduplicationDriver.java:Driver 类,用于设置任务参数并提交任务到 Hadoop 集群在 

DataDeduplicationMapper.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. public class DataDeduplicationMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  6.     @Override
  7.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  8.         // 将输入的一行文本转换为字符串
  9.         String line = value.toString();
  10.         // 将字符串作为键,值设置为 1
  11.         context.write(new Text(line), new LongWritable(1));
  12.     }
  13. }
复制代码


  • 在 DataDeduplicationReducer.java 文件中,编写以下代码:
java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.util.HashSet;
  5. import java.util.Set;
  6. public class DataDeduplicationReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  7.     private Set<String> uniqueData = new HashSet<>();
  8.     @Override
  9.     protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  10.         // 检查键是否已经存在于 uniqueData 集合中
  11.         if (!uniqueData.contains(key.toString())) {
  12.             // 如果键不存在,则将其添加到集合中,并输出键值对
  13.             uniqueData.add(key.toString());
  14.             context.write(key, new LongWritable(1));
  15.         }
  16.     }
  17. }
复制代码


  • 在 DataDeduplicationDriver.java 文件中,编写以下代码:
java
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class DataDeduplicationDriver {
  9.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  10.         // 创建配置对象
  11.         Configuration conf = new Configuration();
  12.         // 创建 Job 实例
  13.         Job job = Job.getInstance(conf);
  14.         // 设置 Job 的主类,即包含 main 方法的类
  15.         job.setJarByClass(DataDeduplicationDriver.class);
  16.         // 设置 Mapper 类
  17.         job.setMapperClass(DataDeduplicationMapper.class);
  18.         // 设置 Reducer 类
  19.         job.setReducerClass(DataDeduplicationReducer.class);
  20.         // 设置输出的键类型
  21.         job.setOutputKeyClass(Text.class);
  22.         // 设置输出的值类型
  23.         job.setOutputValueClass(LongWritable.class);
  24.         // 设置输入文件路径
  25.         FileInputFormat.addInputPath(job, new Path(args[0]));
  26.         // 设置输出文件路径
  27.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  28.         // 提交任务,并等待任务完成
  29.         boolean result = job.waitForCompletion(true);
  30.         System.exit(result? 0 : 1);
  31.     }
  32. }
复制代码

  • 编译并打包代码,天生 JAR 文件。
  • 将必要去重的数据文件上传到 Hadoop 分布式文件体系(HDFS)的某个目录下。
  • 在终端或命令行中实行以下命令,运行 MapReduce 任务:
收起
bash
  1. hadoop jar data-deduplication.jar DataDeduplicationDriver /input /output
复制代码
其中,/input 是输入数据文件所在的路径,/output 是去重结果输出的路径。

  • 任务实行完成后,可以在 HDFS 的 /output 目录下查看去重后的结果文件。
十、以下是一个使用 MapReduce 对 CSV 文件数据举行统计的案例代码:



  • 创建一个 Java 项目,并在项目中创建以下四个文件:

    • CsvMapper.java:Mapper 类,用于读取 CSV 文件并将每行数据转换为键值对。
    • CsvReducer.java:Reducer 类,用于接收 mapper 输出的键值对,举行数据统计。
    • CsvPartitioner.java:Partitioner 类,用于根据键的字段举行分区,确保相同字段的数据被分配到同一个 Reducer。
    • CsvDriver.java:Driver 类,用于设置任务参数并提交任务到 Hadoop 集群。

在 CsvMapper.java 文件中,编写以下代码:

java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.util.StringTokenizer;
  6. public class CsvMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  7.     private Text field = new Text();
  8.     @Override
  9.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10.         // 将输入的一行 CSV 数据转换为字符串
  11.         String line = value.toString();
  12.         // 使用 StringTokenizer 类按逗号分割字符串
  13.         StringTokenizer tokenizer = new StringTokenizer(line, ",");
  14.         // 遍历分割后的字段
  15.         while (tokenizer.hasMoreTokens()) {
  16.             // 取出每个字段并设置为键
  17.             String fieldValue = tokenizer.nextToken();
  18.             field.set(fieldValue);
  19.             // 输出键值对,值为 1
  20.             context.write(field, new LongWritable(1));
  21.         }
  22.     }
  23. }
复制代码

  • 在 CsvReducer.java 文件中,编写以下代码:
java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. public class CsvReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  7.     private Map<String, Long> stats = new HashMap<>();
  8.     @Override
  9.     protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  10.         long count = 0;
  11.         // 遍历 values,累加每个字段的出现次数
  12.         for (LongWritable value : values) {
  13.             count += value.get();
  14.         }
  15.         // 将字段和统计结果存储到 stats 中
  16.         stats.put(key.toString(), count);
  17.     }
  18.     @Override
  19.     protected void cleanup(Context context) throws IOException, InterruptedException {
  20.         // 遍历 stats 并输出每个字段及其统计结果
  21.         for (Map.Entry<String, Long> entry : stats.entrySet()) {
  22.             context.write(new Text(entry.getKey()), new LongWritable(entry.getValue()));
  23.         }
  24.     }
  25. }
复制代码


  • 在 CsvPartitioner.java 文件中,编写以下代码:
java
  1. import org.apache.hadoop.io.Text;
  2. import org.apache.hadoop.mapreduce.Partitioner;
  3. public class CsvPartitioner extends Partitioner<Text, LongWritable> {
  4.     @Override
  5.     public int getPartition(Text key, LongWritable value, int numPartitions) {
  6.         // 根据键的第一个字段进行分区
  7.         return key.toString().split(",")[0].hashCode() % numPartitions;
  8.     }
  9. }
复制代码


  • 在 CsvDriver.java 文件中,编写以下代码:
java
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class CsvDriver {
  9.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  10.         // 创建配置对象
  11.         Configuration conf = new Configuration();
  12.         // 创建 Job 实例
  13.         Job job = Job.getInstance(conf);
  14.         // 设置 Job 的主类,即包含 main 方法的类
  15.         job.setJarByClass(CsvDriver.class);
  16.         // 设置 Mapper 类
  17.         job.setMapperClass(CsvMapper.class);
  18.         // 设置 Reducer 类
  19.         job.setReducerClass(CsvReducer.class);
  20.         // 设置 Partitioner 类
  21.         job.setPartitionerClass(CsvPartitioner.class);
  22.         // 设置输出的键类型
  23.         job.setOutputKeyClass(Text.class);
  24.         // 设置输出的值类型
  25.         job.setOutputValueClass(LongWritable.class);
  26.         // 设置输入文件路径
  27.         FileInputFormat.addInputPath(job, new Path(args[0]));
  28.         // 设置输出文件路径
  29.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  30.         // 提交任务,并等待任务完成
  31.         boolean result = job.waitForCompletion(true);
  32.         System.exit(result? 0 : 1);
  33.     }
  34. }
复制代码


  • 编译并打包代码,天生 JAR 文件。
  • 将必要统计的 CSV 文件上传到 Hadoop 分布式文件体系(HDFS)的某个目录下。
  • 在终端或命令行中实行以下命令,运行 MapReduce 任务:
bash
  1. hadoop jar csv-statistics.jar CsvDriver /input /output
复制代码

其中,/input 是输入 CSV 文件所在的路径,/output 是统计结果输出的路径。


  • 任务实行完成后,可以在 HDFS 的 /output 目录下查看统计结果文件。
十一、要查看 Hadoop 与 MapReduce 案例的运行结果,你可以按照以下步骤举行操纵:



  • 确保你的 Hadoop 集群已经正确安装和配置,而且你已经成功运行了 MapReduce 任务。
  • 在任务完成后,Hadoop 会将结果输出到指定的输出目录。你可以通过命令行或 Hadoop 的文件体系接口来查看输出目录中的内容。
  • 使用以下命令查看输出目录中的文件列表:
plaintext
  1.    hdfs dfs -ls /output
复制代码
其中,/output 是你在代码中指定的输出目录路径。

  • 查看输出文件的内容。你可以使用以下命令查看输出文件的内容:
plaintext
  1.    hdfs dfs -cat /output/part-r-00000
复制代码
其中,part-r-00000 是输出文件的名称,大概会有所差别。

  • 分析输出结果。输出文件的内容将包罗每个单词及其出现次数的信息。你可以根据必要进一步处置惩罚和分析这些结果。
请注意,具体的命令和操纵大概因你的 Hadoop 安装和配置而有所差别。上述步骤提供了一样平常的引导,你大概必要根据现实情况举行得当的调整。
十二、总结与展望


本文循大数据浪潮脉络,深挖 Hadoop 与 MapReduce 技能精华,解锁匀称数盘算密码,从理论原理到实战代码,全方位拆解剖析。Hadoop 生态凭借分布式存储、盘算上风,已然成为大数据处置惩罚 “中流砥柱”,MapReduce 模子高效并行处置惩罚特质,更是加快数据代价释放。实战案例彰显技能实操可行性,助力企业、机构从容应对数据挑衅。展望未来,技能迭代日新月异,人工智能、实时盘算与 Hadoop 深度融合,必将拓展应用界限,解锁更多数据潜能,引领各行业迈向数据驱动的智能化新征程。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表