Hadoop中MapReduce过程中Shuffle过程实现自定义排序

铁佛  金牌会员 | 2024-12-28 21:19:12 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 822|帖子 822|积分 2466

Hadoop中MapReduce过程中Shuffle过程实现自定义排序


一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将雷同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的实行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但偶然我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。
二、实现WritableComparable接口

1、自定义Key类

起首,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。
  1. package mr;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class Employee implements WritableComparable<Employee> {
  7.     private int empno;
  8.     private String ename;
  9.     private String job;
  10.     private int mgr;
  11.     private String hiredate;
  12.     private int sal;
  13.     private int comm;
  14.     private int deptno;
  15.     @Override
  16.     public String toString(){
  17.         return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";
  18.     }
  19.     @Override
  20.     public int compareTo(Employee o) {
  21.         // 多个列的排序:select * from emp order by deptno,sal;
  22.         // 首先按照deptno排序
  23.         if(this.deptno > o.getDeptno()){
  24.             return 1;
  25.         }else if(this.deptno < o.getDeptno()){
  26.             return -1;
  27.         }
  28.         // 如果deptno相等,按照sal排序
  29.         if(this.sal >= o.getSal()){
  30.             return 1;
  31.         }else{
  32.             return -1;
  33.         }
  34.     }
  35.     @Override
  36.     public void write(DataOutput output) throws IOException {
  37.         // 序列化
  38.         output.writeInt(this.empno);
  39.         output.writeUTF(this.ename);
  40.         output.writeUTF(this.job);
  41.         output.writeInt(this.mgr);
  42.         output.writeUTF(this.hiredate);
  43.         output.writeInt(this.sal);
  44.         output.writeInt(this.comm);
  45.         output.writeInt(this.deptno);
  46.     }
  47.     @Override
  48.     public void readFields(DataInput input) throws IOException {
  49.         // 反序列化
  50.         this.empno = input.readInt();
  51.         this.ename = input.readUTF();
  52.         this.job = input.readUTF();
  53.         this.mgr = input.readInt();
  54.         this.hiredate = input.readUTF();
  55.         this.sal = input.readInt();
  56.         this.comm = input.readInt();
  57.         this.deptno = input.readInt();
  58.     }
  59. }
复制代码
三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法答应我们在不修改Key类的情况下实现自定义排序。
  1. package mr;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class CustomSort {
  9.     public static class Map extends Mapper<Object, Text, Employee, IntWritable> {
  10.         private static Employee emp = new Employee();
  11.         private static IntWritable one = new IntWritable(1);
  12.         @Override
  13.         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  14.             String[] line = value.toString().split("\t");
  15.             emp.setEmpno(Integer.parseInt(line[0]));
  16.             emp.setEname(line[1]);
  17.             emp.setJob(line[2]);
  18.             emp.setMgr(Integer.parseInt(line[3]));
  19.             emp.setHiredate(line[4]);
  20.             emp.setSal(Integer.parseInt(line[5]));
  21.             emp.setComm(Integer.parseInt(line[6]));
  22.             emp.setDeptno(Integer.parseInt(line[7]));
  23.             context.write(emp, one);
  24.         }
  25.     }
  26.     public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {
  27.         @Override
  28.         protected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  29.             for (IntWritable val : values) {
  30.                 context.write(key, val);
  31.             }
  32.         }
  33.     }
  34.     public static void main(String[] args) throws Exception {
  35.         Configuration conf = new Configuration();
  36.         Job job = Job.getInstance(conf, "CustomSort");
  37.         job.setJarByClass(CustomSort.class);
  38.         job.setMapperClass(Map.class);
  39.         job.setReducerClass(Reduce.class);
  40.         job.setOutputKeyClass(Employee.class);
  41.         job.setOutputValueClass(IntWritable.class);
  42.         // 设置自定义排序器
  43.         job.setSortComparatorClass(EmployeeComparator.class);
  44.         
  45.         Path in = new Path("hdfs://localhost:9000/mr/in/customsort");
  46.         Path out = new Path("hdfs://localhost:9000/mr/out/customsort");
  47.         FileInputFormat.addInputPath(job, in);
  48.         FileOutputFormat.setOutputPath(job, out);
  49.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  50.     }
  51. }
复制代码
3、自定义排序器类

  1. package mr;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import org.apache.hadoop.io.WritableComparator;
  4. public class EmployeeComparator extends WritableComparator {
  5.     protected EmployeeComparator() {
  6.         super(Employee.class, true);
  7.     }
  8.     @Override
  9.     public int compare(WritableComparable w1, WritableComparable w2) {
  10.         Employee e1 = (Employee) w1;
  11.         Employee e2 = (Employee) w2;
  12.         // 首先按照deptno排序
  13.         int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());
  14.         if (deptCompare != 0) {
  15.             return deptCompare;
  16.         }
  17.         // 如果deptno相等,按照sal排序
  18.         return Integer.compare(e1.getSal(), e2.getSal());
  19.     }
  20. }
复制代码
四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator。
五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,答应我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和正确性。

版权声明:本博客内容为原创,转载请保留原文链接及作者信息。
参考文章


  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

铁佛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表