hadoop MapReduce运营商案例关于用户基站停留数据统计

打印 上一主题 下一主题

主题 517|帖子 517|积分 1551



如果需要文件和代码的话可评论区留言邮箱,我给你发源代码
本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html
实验要求

统计每个用户在不同时段中各个基站的停留时间。
1.功能描述

用户的手机,连接到不同的基站会产生一条记录。
数据格式为:用户标识 设备标识 基站位置 通讯的日期 通讯时间
example: 0000009999 0054785806 00000089 2016-02-21 21:55:37
需要得到的数据格式为:
用户标识 时段 基站位置 停留时间
example: 0000000001 09-18 00000003 15
用户0000000001在09-18点这个时间段在基站00000003停留了15分钟
2.实现思路

程序运行支持传入时间段,比如“09-18-24”,表示分为0点到9点,9点到18点,18点到24点三个时间段。

  • (1)Mapper阶段
    对输入的数据,算出它属于哪个时间段。
    k1:每行记录在文本中的偏移量。
    v2:一条记录
    k2用“用户ID,时间段”输出。
    v2用“基站位置,时间”。时间用unix time
  • (2)Reducer阶段
    对获取的v3(v3是一个集合,每个元素是v2,相当于按照k2对v2分组)进行排序,以时间升序排序。
    计算两两之间的时间间隔,保存到另一个集合中,两个不同的时间间隔中,从基站A移动到基站B,这样获取到在A基站的停留的时间。
    同理从基站B移动到基站C,基站C移动到基站D,依次类推,所有的时间都获取到。再把时间累加起来,就可以获取到总的时间。
本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html
代码实现

PhoneMain.java
  1. package phoneMapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. /**
  10. * Created by ue50 on 11/13/19.
  11. */
  12. public class PhoneMain
  13. {
  14.     public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException
  15.     {
  16.         //String.equals()比较字符串的值是否相同
  17.         if(args == null || "0".equals(args[0]))
  18.         {
  19.             throw new RuntimeException("argument is not right!");
  20.         }
  21.         //Configuration是作业的配置信息类
  22.         Configuration configuration = new Configuration();
  23.         //set(String name, String value)设置配置项
  24.         configuration.set("timeRange", args[0]);
  25.         Job job = Job.getInstance(configuration);
  26.         job.setJarByClass(PhoneMain.class);
  27.         job.setMapperClass(PhoneMapper.class);
  28.         job.setMapOutputKeyClass(Text.class);
  29.         job.setMapOutputKeyClass(Text.class);
  30.         job.setReducerClass(PhoneReducer.class);
  31.         job.setOutputKeyClass(Text.class);
  32.         job.setOutputValueClass(Text.class);
  33.         //FileInputFormat.setInputPaths(job, new Path("hdfs://xdata-m0:8020/user/ue50/pos.txt"));
  34.         //FileOutputFormat.setOutputPath(job, new Path("hdfs://xdata-m0:8020/user/ue50/out"));
  35.         FileInputFormat.setInputPaths(job, new Path(args[1]));
  36.         FileOutputFormat.setOutputPath(job, new Path(args[2]));
  37.         job.waitForCompletion(true);
  38.     }
  39. }
复制代码
Mapper阶段
PhoneMapper.java
  1. package phoneMapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * Created by ue50 on 11/13/19.
  9. */
  10. public class PhoneMapper extends Mapper<LongWritable, Text, Text, Text>
  11. {
  12.     private int[] timeRangeList;
  13.     @Override
  14.     //setup()被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作
  15.     protected void setup(Context context) throws IOException,InterruptedException
  16.     {
  17.         //Configuration是作业的配置信息类,通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息
  18.         Configuration configuration = context.getConfiguration();
  19.         //get(String name)根据配置项的键name获取相应的值
  20.         String timeRange = configuration.get("timeRange");//运行时传入的时间段,比如“09-18-24”
  21.         String[] timeRangeString = timeRange.split("-");
  22.         timeRangeList = new int[timeRangeString.length];
  23.         for(int i = 0; i < timeRangeString.length;i++)
  24.         {
  25.             //timeRangeList数组保存传入的时间,如:09、18、24
  26.             timeRangeList[i] = Integer.parseInt(timeRangeString[i]);
  27.         }
  28.     }
  29.     @Override
  30.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  31.     {
  32.         String values[] = value.toString().split("\\s+");//对一条记录"用户标识 设备标识   基站位置 通讯的时间"按空格拆分
  33.         String userId = values[0];//用户标识
  34.         String baseStation = values[2];//基站位置
  35.         String timeString = values[4];//访问时间,如:21:55:37
  36.         String[] times = timeString.split(":");//对访问时间按':'拆分
  37.         int hour = Integer.parseInt(times[0]);//小时
  38.         //startHour、endHour时间段的起止时间
  39.         int startHour = 0;
  40.         int endHour = 0;
  41.         for(int i = 0; i < timeRangeList.length; i++)
  42.         {
  43.             if(hour < timeRangeList[i])
  44.             {
  45.                 if(i == 0)
  46.                 {
  47.                     startHour = 0;
  48.                 }
  49.                 else
  50.                 {
  51.                     startHour = timeRangeList[i-1];
  52.                 }
  53.                 endHour = timeRangeList[i];
  54.                 break;
  55.             }
  56.         }
  57.         if(startHour == 0 && endHour == 0)
  58.         {
  59.             return;
  60.         }
  61.         //k2:用户标识  时间段  v2:基站位置-访问时间
  62.         context.write(new Text(userId + "\t" + startHour + "-" + endHour + "\t"), new Text(baseStation + "-" + timeString));
  63.     }
  64. }
复制代码
Reducer阶段
[code]package phoneMapReduce;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.text.DateFormat;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.*;/** * Created by ue50 on 11/13/19. */public class PhoneReducer extends Reducer{    @Override    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException    {        List valueList = new LinkedList();//基于链表的动态数组        //Map是一种把键对象和值对象映射的集合,TreeMap是一个有序的key-value集合,        //它是通过红黑树实现的,TreeMap中的元素默认按照key的自然排序排列        Map residenceTimeMap = new TreeMap();        for(Text value : values)        {            String item = value.toString();            valueList.add(item);//"基站位置-访问时间"的集合        }        if(valueList == null || valueList.size()
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表