MapReduce,Yarn,Spark理解与实行流程

打印 上一主题 下一主题

主题 1027|帖子 1027|积分 3081

MapReduce的API理解

Mapper



  • 如果是单词计数:hello:1, hello:1, world:1
  1. public void map(Object key, // 首字符偏移量
  2.                     Text value, // 文件的一行内容
  3.                     Context context) // Mapper端的上下文,与 OutputCollector 和 Reporter 的功能类似
  4.             throws IOException, InterruptedException {
复制代码
Reduce



  • 注意拿到的是value的集合
  • 如果是单词计数:hello:【1,1】,world:【1】
  1.   public void reduce(Text key, // Map端 输出的 key 值
  2.                        Iterable<IntWritable> values, // Map端 输出的 Value 集合(相同key的集合)
  3.                        Context context) // Reduce 端的上下文,与 OutputCollector 和 Reporter 的功能类似
  4.             throws IOException, InterruptedException
  5.     {
复制代码
整体架构




  • NameNode: 负责存储文件的元信息(如文件的巨细、块信息、存储位置等)。通常会摆设一个 Secondary NameNode 来周期性归并 NameNode 的 edit log 以减少规复时间,但它并不是热备,而是辅助管理。避免单点故障
  • JobTracker:负责 Hadoop MapReduce 任务的调度和资源管理,接收作业请求并将任务分配给不同的 TaskTracker(工作节点)
  • DataNode:实际存储数据的节点,存有多个数据块(Block),128MB
  • TaskTracker:实际举行mapper和reduce工作的节点,可以看到图中TaskTracker接近DataNode,这是“移动计算比移动数据更自制” 的筹划理念,目标是减少数据传输,进步计算服从
输入阶段


  • 文件存储到 HDFS(Hadoop Distributed File System)
  • 当文件上传到 HDFS 时,文件会先到达 NameNode,NameNode 负责存储文件的元信息(如文件的巨细、块信息、存储位置等)。
  • 随后,文件会被分割成固定巨细的数据块(Block),默认每块巨细是 128 MB(可以通过配置调整)。
  • 这些数据块会被分布式地存储到集群中的不同 DataNode 节点上,通常每个块有多个副本(默认是 3 个),以包管数据的可靠性,同步到指定数量的副本后,才向NameNode确认写操作成功,包管一致性。TiDB的底层TiKV的存储也类似这个结构,根据键值对分为多个region,且不同的节点生存奇数个副本,并有raft协议包管一致性。
Mapper阶段



  • TaskTracker 和 Mapper 的运行:

    • 当实行 MapReduce 作业时,JobTracker会负责任务的调度。
    • 根据 NameNode 提供的块位置信息,JobTracker 会在包罗该块数据的 DataNode 上启动 Mapper,这是数据本地化优化的焦点。
    • 每个 Mapper 会处理一个或多个数据块。
    • Mapper会将每一行的文件处理成键值对形式,也可以举行数据预处理(过滤、清洗):

  1. // input.txt:
  2. apple 10
  3. banana 20
  4. apple 5
  5. // after mapper:
  6. (apple, 5), (apple, 10), (banana, 20)
复制代码

  • Mapper 的输出结果存储在本地磁盘的缓存文件中或者磁盘中,并分为多个分区,每个分区对应一个 Reducer。
Shuffle 阶段

什么是 Shuffle?

Shuffle 是将 Mapper 输出的中间数据(键值对)分发给 Reducer 的过程。
其主要任务包罗:

  • 对 Mapper 输出的数据举行分区。
  • 将分区数据从 Mapper 节点移动到 Reducer 节点。
  • 对分区数据举行排序和归并。
Shuffle 的实行角色



  • Mapper 节点实行分区:

    • 在 Mapper 阶段竣过后,输出结果会被分区(默认使用 HashPartitioner)。
    • 每个分区对应一个 Reducer。Mapper 的数据会按照分区规则存储到本地磁盘的多个文件中。

  • JobTracker(或 Yarn)负责和谐:

    • JobTracker 会关照各 Reducer 节点**从相应的 Mapper 节点拉取(pull)属于自己的分区数据。

  • Reducer 节点的数据迁移:

    • 每个 Reducer 从多个 Mapper 节点拉取自己的数据。
    • 拉取的数据会临时存储在 Reducer 节点上,并在内存中举行排序和归并,以便 Reducer 处理。

Shuffle 阶段会涉及到数据从 Mapper 节点到 Reducer 节点的迁移,这也是整个 MapReduce 流程中最耗时的一部分。
举个例子,更好理解如何分区以及数据传输:
Shuffle例子

输入文件内容:
  1. File1: Hello Hadoop Hello
  2. File2: Hadoop MapReduce Hadoop
复制代码
分块:


  • Block1(File1)
  • Block2(File2)
Mapper 输出:

假设有 2 个 Mapper 和 2 个 Reducer,并用 key.hashCode() % 2 作为分区规则。
MapperKeyPartition (Reducer)OutputMapper1Hello0{Hello: 1}Mapper1Hadoop1{Hadoop: 1}Mapper1Hello0{Hello: 1}Mapper2Hadoop1{Hadoop: 1}Mapper2MapReduce0{MapReduce: 1}Mapper2Hadoop1{Hadoop: 1} Shuffle 阶段:

在 Shuffle 阶段,每个 Reducer 会从多个 Mapper 拉取数据:
ReducerData SourceReceived DataReducer0Mapper1 + Mapper2{Hello: [1, 1], MapReduce: [1]}Reducer1Mapper1 + Mapper2{Hadoop: [1, 1, 1]} Reducer 聚合:

终极,Reducer 聚合数据,输出结果:
  1. Reducer0: {Hello: 2, MapReduce: 1}
  2. Reducer1: {Hadoop: 3}
复制代码
Reduce阶段

前面提到,reduce时会向map的节点获取数据,它是如何直到谁人mapper节点的呢?
详细是这样的:map任务成功完成后,它们会使用心跳机制关照它们JobTracker。因此,对于指定作业JobTracker 知道 map输出和主机位置之间的映射关系。reducer 中的一个线程定期扣问 JobTracker 以便获取 map输出主机的位置,直到获得所有输出位置。
Reduce 任务实行完毕后,输出结果会直接存储到 HDFS,但是Reduce 节点不会主动关照 NameNode 数据位置,而是 HDFS 负责数据存储的元数据管理,Reduce 任务会通过 HDFS 客户端 API 将数据写入 HDFS
写数据到 HDFS 的过程(详)


  • 客户端请求写入文件:
    客户端(例如 Reducer 或用户程序)向 HDFS 的 NameNode 发起写入请求。
    客户端须要告诉 NameNode 文件的元信息(如文件名、巨细等)
    NameNode 分配数据块
    NameNode 根据文件巨细、HDFS 的配置(如块巨细和副本数量),分配该文件须要的 数据块(Block)
    对于每个块,NameNode 会选择多个 DataNode(通常是 3 个)作为存储目标,并将这些位置信息返回给客户端。
  • 分配数据块:
    通常会优先选择离客户端最近的节点,或者是同一个机架的节点,来减少网络延迟。副本的存储节点也会尽量分布在不同的机架上,进步数据可靠性
    客户端直接写入 DataNode:客户端根据 NameNode 返回的块位置信息,开始向第一组目标 DataNode 写入数据。写入过程是 流式传输,数据被切分为块后,直接发送给第一个 DataNode
    DataNode 举行副本复制:第一台 DataNode 在接收到数据后,会立刻将该数据块传输到下一台 DataNode,依此类推,直到完成所有副本的写入(链式复制)
  • DataNode 汇报块信息:
    每个 DataNode 在数据写入完成后,会向 NameNode 汇报存储的块信息(如块 ID、块巨细、存储位置)
  • 写入完成:
    当所有数据块都写入成功,并且所有副本都存储完成后,HDFS 客户端关照 NameNode 文件写入完成,包管数据的一致性
    NameNode 将文件的元数据标志为 “完成状态”
MapReduce框架的Java实现

这里手写一个简易的java实现的框架,方便各人理解
  1. import java.util.*;
  2. import java.util.concurrent.*;
  3. import java.util.stream.Collectors;
  4. // 定义 Mapper 接口
  5. interface Mapper {
  6.     List<Pair<String, Integer>> map(String input);
  7. }
  8. // 定义 Reducer 接口
  9. interface Reducer {
  10.     Pair<String, Integer> reduce(String key, List<Integer> values);
  11. }
  12. // 定义 Pair 类,用于存储键值对
  13. class Pair<K, V> {
  14.     public final K key;
  15.     public final V value;
  16.     public Pair(K key, V value) {
  17.         this.key = key;
  18.         this.value = value;
  19.     }
  20.     @Override
  21.     public String toString() {
  22.         return key + ": " + value;
  23.     }
  24. }
  25. // 实现支持多个 Mapper 和 Reducer 的 MapReduce 框架
  26. class ParallelMapReduceFramework {
  27.     private List<Mapper> mappers;
  28.     private List<Reducer> reducers;
  29.     private int reducerCount;
  30.     public ParallelMapReduceFramework(List<Mapper> mappers, List<Reducer> reducers) {
  31.         this.mappers = mappers;
  32.         this.reducers = reducers;
  33.         this.reducerCount = reducers.size();
  34.     }
  35.     public Map<String, Integer> execute(List<String> inputs) throws InterruptedException, ExecutionException {
  36.         ExecutorService executor = Executors.newFixedThreadPool(mappers.size());
  37.         // 1. Map 阶段:将输入数据分给多个 Mapper 并行处理
  38.         List<Future<List<Pair<String, Integer>>>> mapResults = new ArrayList<>();
  39.         int chunkSize = inputs.size() / mappers.size();
  40.         for (int i = 0; i < mappers.size(); i++) {
  41.             int start = i * chunkSize;
  42.             int end = (i == mappers.size() - 1) ? inputs.size() : (i + 1) * chunkSize;
  43.             List<String> chunk = inputs.subList(start, end);
  44.             Mapper mapper = mappers.get(i);
  45.             mapResults.add(executor.submit(() -> {
  46.                 List<Pair<String, Integer>> results = new ArrayList<>();
  47.                 for (String input : chunk) {
  48.                     results.addAll(mapper.map(input));
  49.                 }
  50.                 return results;
  51.             }));
  52.         }
  53.         // 收集所有 Mapper 生成的键值对
  54.         List<Pair<String, Integer>> allMappedData = new ArrayList<>();
  55.         for (Future<List<Pair<String, Integer>>> future : mapResults) {
  56.             allMappedData.addAll(future.get());
  57.         }
  58.         // 2. Shuffle 阶段:将键值对分片,分配给不同的 Reducer
  59.         Map<Integer, List<Pair<String, Integer>>> reducerInput = new HashMap<>();
  60.         for (int i = 0; i < reducerCount; i++) {
  61.             reducerInput.put(i, new ArrayList<>());
  62.         }
  63.         for (Pair<String, Integer> pair : allMappedData) {
  64.             int reducerIndex = Math.abs(pair.key.hashCode() % reducerCount);
  65.             reducerInput.get(reducerIndex).add(pair);
  66.         }
  67.         // 3. Reduce 阶段:每个 Reducer 处理一个分片数据
  68.         List<Future<Map<String, Integer>>> reduceResults = new ArrayList<>();
  69.         for (int i = 0; i < reducers.size(); i++) {
  70.             int index = i;
  71.             Reducer reducer = reducers.get(i);
  72.             List<Pair<String, Integer>> inputForReducer = reducerInput.get(index);
  73.             reduceResults.add(executor.submit(() -> {
  74.                 // 按键分组
  75.                 Map<String, List<Integer>> groupedData = new HashMap<>();
  76.                 for (Pair<String, Integer> pair : inputForReducer) {
  77.                     groupedData.computeIfAbsent(pair.key, k -> new ArrayList<>()).add(pair.value);
  78.                 }
  79.                 // Reduce 操作
  80.                 Map<String, Integer> result = new HashMap<>();
  81.                 for (Map.Entry<String, List<Integer>> entry : groupedData.entrySet()) {
  82.                     result.put(entry.getKey(), reducer.reduce(entry.getKey(), entry.getValue()).value);
  83.                 }
  84.                 return result;
  85.             }));
  86.         }
  87.         // 收集所有 Reducer 的结果
  88.         Map<String, Integer> finalResult = new HashMap<>();
  89.         for (Future<Map<String, Integer>> future : reduceResults) {
  90.             finalResult.putAll(future.get());
  91.         }
  92.         executor.shutdown();
  93.         return finalResult;
  94.     }
  95. }
  96. // 实现单词统计的 Mapper 和 Reducer
  97. class WordCountMapper implements Mapper {
  98.     @Override
  99.     public List<Pair<String, Integer>> map(String input) {
  100.         String[] words = input.split("\\s+");
  101.         List<Pair<String, Integer>> result = new ArrayList<>();
  102.         for (String word : words) {
  103.             result.add(new Pair<>(word.toLowerCase(), 1));
  104.         }
  105.         return result;
  106.     }
  107. }
  108. class WordCountReducer implements Reducer {
  109.     @Override
  110.     public Pair<String, Integer> reduce(String key, List<Integer> values) {
  111.         int sum = values.stream().mapToInt(Integer::intValue).sum();
  112.         return new Pair<>(key, sum);
  113.     }
  114. }
  115. // 测试并行 MapReduce 框架
  116. public class ParallelWordCountExample {
  117.     public static void main(String[] args) throws InterruptedException, ExecutionException {
  118.         // 输入数据
  119.         List<String> inputs = Arrays.asList(
  120.             "Hello world hello",
  121.             "MapReduce is powerful",
  122.             "Hello MapReduce world",
  123.             "Java is great",
  124.             "Hello from the other side"
  125.         );
  126.         // 创建多个 Mapper 和 Reducer 实例
  127.         List<Mapper> mappers = Arrays.asList(new WordCountMapper(), new WordCountMapper());
  128.         List<Reducer> reducers = Arrays.asList(new WordCountReducer(), new WordCountReducer());
  129.         // 执行 MapReduce
  130.         ParallelMapReduceFramework framework = new ParallelMapReduceFramework(mappers, reducers);
  131.         Map<String, Integer> wordCounts = framework.execute(inputs);
  132.         // 输出结果
  133.         wordCounts.forEach((word, count) -> System.out.println(word + ": " + count));
  134.     }
  135. }
复制代码
Yarn(MapReduce2)

MapReducer1的问题:

前面提到的MapReducer的模子的问题:

  • JobTracker的负载(可扩展性):MapReduce 1 中,,obtracker 同时负责作业调度(将任务与 tasktracker 匹配)和任务进度监控(跟踪任务、重启失败或迟缓的任务;记录任务流水,如维护计数器的计数)。当任务实例过多时,会导致体系无法再扩展
  • JobTracker的可用性:由于JobTracker管理所有应用的状态,为了实现其可用性,就要创建副本并同步内存中的数据,强一致性意味着性能的消耗,弱一致性意味着故障规复时的数据的差异。
  • 节点的利用率:MapReduce 1中,每个tasktracker都配置有若干固定长度和类型的slot,这些slot是静态分配的,在配置的时间就被划分为map slot和reduce slot。一个map slot仅能用于运行一个map任务,一个reduce slot仅能用于运行一个reduce任务,以是分配不当就会导致体系性能低下
针对以上几个问题,Yarn将jobTracker的工作拆分,分为资源管理器(负责作业调度),以及application Master(负责任务进度监控,一个MapperReducer应用对应一个application Master),通过合理的资源分配进步节点的利用率,每个应用交由一个master管理,可以无限扩展资源避免单点的负载过大,还可以通过zookeeper等机制分别实现资源管理器和application master的高可用(如失败后再次申请资源)。
另有一个优点就是实现了多租户,对于资源的抽象和分配机制,可以在Yarn上构建不同的应用,如Spark等

MapReducer2的工作流程

   阐明几个概念
  

  • Yarn的资源管理器,申请的资源即为一个container,可以指定其计算机的资源数量(内存和CPU),可以理解为之前版本的DataNode拆分成了多个容器
  • Map,reduce的实行是容器中的历程,而前面提到的Application Master实际上也是容器中的历程,只是功能较为特殊
  • 一个MapReduce应用对应一个Application Master
  • 一个节点对应一个Node Manager,负责管理该节点上的所有容器和心跳
  



  • 1-5的步骤较为简单,就是创建一个container用于生成application master。详细是资源管理器收到调用它的submitApplication()消息后,便将请求传递给 YARN调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master的历程
  • 接下来,它担当来自共享文件体系的、在客户端计算的输入分片(步骤7)。然后对每一个分片创建一个 map任务对象以及由mapreduce.job.reduces 属性(通过作业的 setNumReduceTasks()方法设置),根据配置确定多个reduce任务对象
  • application master就会为该作业中的所有map任务和reduce任务向资源管理器请求实行详细任务的容器
  • 一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master就通过与节点管理器通信来启动容器(步骤9a和9b)。该任务由主类为YarnChild的一个 Java 应用程序实行。在它运行任务之前,首先将任务须要的资源本地化,包罗作业的配置、JAR 文件和所有来自分布式缓存的文件(步骤 10)。最后,运行map任务或reduce任务(步骤11)。
Spark

实用场景

Spark 最突出的表现在于它能将作业与作业之间产生的大规模的工作数据集存储在内存中。MapReduce的数据集始终须要从磁盘上加载。从Spark 处理模子中获益最大的两种应用类型分别为迭代算法(即对一个数据集重复应用某个函数,直至满足退出条件)和交互式分析(用户向数据集发出一系列专用的探索性查询,好比查出数据后,根据数据在举行多次筛选分析)。
相干概念

RDD

RDD(Resilient Distributed Dataset)是 Spark 的焦点抽象,用于表现分布式、不变、容错的数据集。
RDD的分区(Partition) 是 Spark 对数据的根本并行处理单位。RDD会被分割成多个分区,并在多个节点上并行处理,以实现高效的分布式计算。分区的巨细就是HDFS 文件默认块巨细(HDFS block size),通常为 128MB。可以理解为从HDFS中读取block到内存中,并且对这个block举行计算的抽象
举个例子,我们在 Spark 中使用 textFile(“hdfs://path/to/my_data.txt”) 读取时,RDD 会被划分为2 个分区,分别对应:
  1. Partition 1 在 Node A 处理 Block 1 数据
  2. Partition 2 在 Node B 处理 Block 2 数据
复制代码
RDD的操作

RDD的生成,一个是读取文件,在不同的block中生身分区,另有一个就是对现有的RDD举行转换
  1. JavaRDD<String> rdd = sc.textFile("hdfs://path/to/my_data.txt");
  2. JavaRDD<String> filteredRDD = rdd.filter(line -> line.contains("error"));
  3. JavaPairRDD<String, Integer> counts = filteredRDD
  4.                                       .mapToPair(line -> new Tuple2<>(line, 1))
  5.                                       .reduceByKey((a, b) -> a + b);
  6. // 触发RDD开始转换,foreach函数也可以触发
  7. counts.saveAsTextFile("hdfs://path/to/output");
复制代码
以上的代码中的filter,mapToPair,reduceByKey就是一系列动作,类似于相应式编程中的订阅发布,当实际订阅(也就是这里的saveAsTextFile实行时),才会触发发布(RDD的开始转换),即惰性转换
RDD的持久化

spark的特性就是可以或许生存中间数据在内存默认RDD不会生存到内存中,当我们须要某部分数据时,可以手动将其生存到内存中,方便下一次计算,以下调用cache缓存

当实行RDD转换时,提示已经生存:

当下一次对该RDD重新举行不同的转换时,提示用到了缓存:

DAG和Stage

多个RDD的转换形成一个有向无环图,当一些可以基于本地的RDD的操作举行的转换的实行链,即每个分区的数据只依靠于上游 RDD (在本地)的一个分区的话(如 map(), filter()),我们当然可以在同一个节点中举行这个转换操作,这称为窄依靠
如果当前 RDD 的分区须要依靠多个上游 RDD 分区(如 reduceByKey(), groupBy()),那么会发生 Shuffle,相当于触发了一次reduce操作,这成为宽依靠
而这个DAG会因为出现宽依靠而举行stage的划分,将实行链拆分成不同的stage部分,每一个stage交给一个节点运行
这个很好理解,相当于上游RDD的reduceByKey须要举行一个类似于mapreducer中的shuffle操作,卑鄙RDD的reduceByKey须要举行一个类似于mapreducer中的reduce操作,而reduce的数据非本地的,且对应的所须要的reduce的任务数量也不等同于map阶段的任务数,以是重新分配

实行过程



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊雷无声

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表