Hadoop中MapReduce过程中Shuffle过程实现自定义排序
一、引言
MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将雷同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的实行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但偶然我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。
二、实现WritableComparable接口
1、自定义Key类
起首,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。
- package mr;
- import org.apache.hadoop.io.WritableComparable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- public class Employee implements WritableComparable<Employee> {
- private int empno;
- private String ename;
- private String job;
- private int mgr;
- private String hiredate;
- private int sal;
- private int comm;
- private int deptno;
- @Override
- public String toString(){
- return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";
- }
- @Override
- public int compareTo(Employee o) {
- // 多个列的排序:select * from emp order by deptno,sal;
- // 首先按照deptno排序
- if(this.deptno > o.getDeptno()){
- return 1;
- }else if(this.deptno < o.getDeptno()){
- return -1;
- }
- // 如果deptno相等,按照sal排序
- if(this.sal >= o.getSal()){
- return 1;
- }else{
- return -1;
- }
- }
- @Override
- public void write(DataOutput output) throws IOException {
- // 序列化
- output.writeInt(this.empno);
- output.writeUTF(this.ename);
- output.writeUTF(this.job);
- output.writeInt(this.mgr);
- output.writeUTF(this.hiredate);
- output.writeInt(this.sal);
- output.writeInt(this.comm);
- output.writeInt(this.deptno);
- }
- @Override
- public void readFields(DataInput input) throws IOException {
- // 反序列化
- this.empno = input.readInt();
- this.ename = input.readUTF();
- this.job = input.readUTF();
- this.mgr = input.readInt();
- this.hiredate = input.readUTF();
- this.sal = input.readInt();
- this.comm = input.readInt();
- this.deptno = input.readInt();
- }
- }
复制代码 三、使用Job.setSortComparatorClass方法
2、设置自定义排序器
除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法答应我们在不修改Key类的情况下实现自定义排序。
- package mr;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class CustomSort {
- public static class Map extends Mapper<Object, Text, Employee, IntWritable> {
- private static Employee emp = new Employee();
- private static IntWritable one = new IntWritable(1);
- @Override
- protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String[] line = value.toString().split("\t");
- emp.setEmpno(Integer.parseInt(line[0]));
- emp.setEname(line[1]);
- emp.setJob(line[2]);
- emp.setMgr(Integer.parseInt(line[3]));
- emp.setHiredate(line[4]);
- emp.setSal(Integer.parseInt(line[5]));
- emp.setComm(Integer.parseInt(line[6]));
- emp.setDeptno(Integer.parseInt(line[7]));
- context.write(emp, one);
- }
- }
- public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {
- @Override
- protected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- for (IntWritable val : values) {
- context.write(key, val);
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, "CustomSort");
- job.setJarByClass(CustomSort.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- job.setOutputKeyClass(Employee.class);
- job.setOutputValueClass(IntWritable.class);
- // 设置自定义排序器
- job.setSortComparatorClass(EmployeeComparator.class);
-
- Path in = new Path("hdfs://localhost:9000/mr/in/customsort");
- Path out = new Path("hdfs://localhost:9000/mr/out/customsort");
- FileInputFormat.addInputPath(job, in);
- FileOutputFormat.setOutputPath(job, out);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码 3、自定义排序器类
- package mr;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- public class EmployeeComparator extends WritableComparator {
- protected EmployeeComparator() {
- super(Employee.class, true);
- }
- @Override
- public int compare(WritableComparable w1, WritableComparable w2) {
- Employee e1 = (Employee) w1;
- Employee e2 = (Employee) w2;
- // 首先按照deptno排序
- int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());
- if (deptCompare != 0) {
- return deptCompare;
- }
- // 如果deptno相等,按照sal排序
- return Integer.compare(e1.getSal(), e2.getSal());
- }
- }
复制代码 四、使用示例
下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator。
五、总结
通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,答应我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和正确性。
版权声明:本博客内容为原创,转载请保留原文链接及作者信息。
参考文章:
- Hadoop之mapreduce数据排序案例(详细代码)
- Java Job.setSortComparatorClass方法代码示例
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |