1 MapReduce 概述

1.1 MapReduce 的定义

MapReduce 是 Google 在 2004 年提出的一种 分布式计算框架,核心思想是把大任务拆分成很多小任务,交由多台普通服务器并行处理,最后再汇总结果。

在 Hadoop 中,MapReduce 成为最经典的 大数据计算引擎

  • 输入:大规模数据集(TB~PB 级别);
  • 过程:通过 Map 阶段切分 & 分布式处理,再通过 Reduce 阶段汇总;
  • 输出:结构化结果(例如:统计词频、聚合指标)。

换句话说:
如果把大数据处理比作做饭,MapReduce 就像一条流水线:先把食材切开分发(Map),然后统一汇总烹饪(Reduce)。


1.2 编程模型:Map 和 Reduce

MapReduce 的设计借鉴了函数式编程思想。用户只需实现两个核心函数:

  • Map(key, value) → list(key, value)
    • 输入:原始数据记录(如一行文本)。
    • 输出:一组中间键值对。
  • Reduce(key, list(values)) → list(value)
    • 输入:相同 key 的所有 value 列表。
    • 输出:聚合后的结果。

ASCII 图示:

输入数据


[Map 阶段] → (k1,v1) (k1,v2) (k2,v1) ...


[Shuffle 阶段] → (k1,[v1,v2]) (k2,[v1]) ...


[Reduce 阶段] → (k1,result1) (k2,result2) ...


1.3 适用场景与局限性

适用场景

  • 日志分析:统计 PV、UV,计算错误率。
  • 搜索引擎:倒排索引构建。
  • 大规模数据聚合:电商销量统计、银行交易流水计算。

局限性

  • 迭代计算性能差:每次迭代都要写入 HDFS,像机器学习这种多轮迭代效率低。
  • 实时性不足:MapReduce 是批处理,不适合秒级响应的场景。
  • 编程模型简单但僵硬:只能写 Map 和 Reduce,不够灵活。

表格:MapReduce 优劣对比

特性优点缺点
编程模型简单只需写 Map & Reduce 函数灵活性差,难以表达复杂逻辑
容错性强节点失败可重试,不影响整体计算任务调度 & 数据传输开销大
适合批处理高吞吐,能处理 PB 级别数据延迟高,不适合实时场景

💡 面试补充

  • 问:MapReduce 和传统单机处理的区别?
    👉 单机处理靠一台机器从头到尾完成;MapReduce 则把任务拆成小块并行处理,速度更快且能横向扩展。
  • 问:为什么说 MapReduce 容错性好?
    👉 因为任务失败时,框架会自动在另一台节点上重新运行该任务,用户几乎无需关心

2 MapReduce 执行流程

MapReduce 程序的执行就像一场大型接力赛:

  • 客户端是发令员,
  • NameNode/ResourceManager 是裁判,
  • 每个 Task 是运动员,
  • 最后结果由 Reduce 队伍汇总交卷。

整个流程分为 作业提交、Map 阶段、Shuffle 阶段、Reduce 阶段、输出结果


2.1 作业提交

  1. 客户端:用户提交 MapReduce 作业,指定输入路径、输出路径、Mapper/Reducer 类。
  2. ResourceManager / JobTracker:接收到请求后,负责作业调度。
  3. HDFS:把输入文件切分成多个 Split,每个 Split 对应一个 Map Task。

ASCII 流程图:

Client → ResourceManager → NodeManager/TaskTracker → 启动 Task

└── 输入文件切分 → [Split1, Split2, Split3...]


2.2 Map 阶段

  • 每个 Map Task 处理一个 Split(通常是 HDFS 的一个 Block)。
  • Mapper 的输入是 (key, value),输出是一组中间 (key, value)
  • 输出会先写入内存缓冲区,再定期溢写到磁盘。

例子:WordCount

  • 输入:Hello Hadoop
  • Map 输出:(Hello,1) (Hadoop,1)

2.3 Shuffle 阶段(灵魂环节)

Shuffle 是 MapReduce 的精华:负责 排序 + 分区 + 分组 + 数据传输

  1. 分区(Partitioner):决定同一个 key 去哪个 Reduce。
  2. 排序(Sort):Map 输出按 key 排序,保证同 key 的记录集中。
  3. 归并(Merge):多路归并小文件,避免过多磁盘 IO。
  4. 传输(Copy):Map 节点把结果通过网络发送到对应 Reduce 节点。

ASCII 图:

Map 输出: (k1,1) (k2,1) (k1,1) ...


Partition → Sort → Merge → Copy → Reduce 节点

表格:Shuffle 关键步骤

步骤作用对性能的影响
分区决定 key 分布位置不均衡会导致数据倾斜
排序同 key 聚集排序效率影响整体性能
归并合并小文件过多小文件会拖慢速度
数据传输Map → Reduce 网络拷贝网络带宽瓶颈明显

2.4 Reduce 阶段

  • Reduce 输入:来自 Shuffle 的 (key, [v1,v2,v3...])
  • Reduce 执行:对同一个 key 的所有 value 进行聚合/统计。
  • Reduce 输出:写入 HDFS。

例子:WordCount

  • 输入:(Hadoop, [1,1,1])
  • 输出:(Hadoop, 3)

2.5 输出结果

  • 最终结果以文件形式存放在 HDFS 的指定目录。
  • 每个 Reduce Task 会生成一个结果文件,多个文件合起来就是完整结果。

💡 面试补充

  • Shuffle 为什么是 MapReduce 的核心?
    👉 因为它保证了同一个 key 的数据能落到同一个 Reduce,逻辑正确性全靠它。
  • 数据倾斜问题如何解决?
    👉 调整 Partitioner,自定义分区函数,或者在 Map 阶段做预聚合(Combiner)。
  • MapReduce 为什么慢?
    👉 主要瓶颈在 Shuffle 阶段:排序 + 网络传输 + 小文件过多。

3 MapReduce 重要组件

Hadoop 的 MapReduce 框架不仅仅是“写 Map 和 Reduce 函数”,在背后还依赖一整套运行组件来保证任务能够被正确调度、执行和监控。主要涉及以下角色:

  • JobTracker / ResourceManager(负责全局调度)
  • TaskTracker / NodeManager(负责本地执行)
  • ApplicationMaster(作业级协调者)

3.1 JobTracker / ResourceManager

Hadoop 1.xHadoop 2.x+ 中,调度组件的名字不同:

  • Hadoop 1.x:由 JobTracker 负责整个集群的作业调度。
  • Hadoop 2.x+:YARN 引入了 ResourceManager (RM),替代 JobTracker。

职责对比表:

版本组件主要职责
Hadoop 1.xJobTracker资源管理 + 作业调度 + 故障恢复
Hadoop 2.x+ResourceManager仅负责全局资源调度,作业管理交给 AM

改进点:

  • 在 1.x 中,JobTracker 同时负责调度和作业状态管理,容易成为性能瓶颈。
  • 在 2.x 中,RM 把作业级别的调度拆分给 ApplicationMaster,极大提升了可扩展性。

3.2 TaskTracker / NodeManager

  • TaskTracker (TT):Hadoop 1.x 的本地执行守护进程,负责接收 JobTracker 下发的任务并执行。
  • NodeManager (NM):Hadoop 2.x+ 的节点级资源管理器,负责监控容器(Container)的资源使用情况(CPU、内存、磁盘、网络)。

ASCII 图示:

ResourceManager


NodeManager (Node1) ── Container ── MapTask / ReduceTask
NodeManager (Node2) ── Container ── MapTask / ReduceTask

重点:

  • NodeManager 负责节点级的“看门人”工作,保证任务不越界(防止某个 Task 占满内存拖死机器)。
  • 如果某个 Task 失败,会被重新分配到别的节点。

3.3 ApplicationMaster (AM)

在 Hadoop 2.x 的 YARN 架构中,每个应用(Application)都有一个独立的 ApplicationMaster

  • 负责向 ResourceManager 申请资源。
  • 负责与 NodeManager 协调,启动和监控具体的 Map/Reduce 任务。
  • 当应用完成后,AM 会释放资源并退出。

表格:JobTracker vs ApplicationMaster

特性JobTracker (Hadoop 1.x)ApplicationMaster (Hadoop 2.x+)
管理范围集群级应用级
单点瓶颈问题易产生瓶颈分散到各个 AM
故障恢复JobTracker 失败影响所有作业AM 失败仅影响单个作业

💡 运维与面试补充

  • 为什么要引入 ApplicationMaster?
    👉 为了解决 JobTracker 的单点瓶颈问题,把作业调度分散到不同 AM。
  • NodeManager 和 DataNode 有什么区别?
    👉 NodeManager 管理计算资源(CPU、内存),DataNode 管理存储(Block)。
  • 面试高频问:MapReduce 在 YARN 上是怎么跑的?
    👉 简答:Client 提交作业 → ResourceManager → 分配 AM → AM 向 RM 申请 Container → NodeManager 启动 Task → 执行完成 → 回收资源。


4 案例示例:WordCount

WordCount(单词计数)是 MapReduce 的“Hello World”。它展示了 MapReduce 的编程模型和执行流程,也是理解分布式计算的最好入门案例。


4.1 输入数据

假设我们在 HDFS 上有一个文件 input.txt,内容如下:

Hadoop MapReduce Hadoop
Big Data Hadoop

这份文本包含多个单词,目标是统计每个单词出现的次数。


4.2 Map 阶段输出

Map 阶段会逐行读取输入,按空格分词,并输出键值对 (word, 1)

示例:

输入行Map 输出
Hadoop MapReduce Hadoop(Hadoop,1) (MapReduce,1) (Hadoop,1)
Big Data Hadoop(Big,1) (Data,1) (Hadoop,1)

4.3 Shuffle 阶段分组

Shuffle 阶段会对 Map 输出的中间结果按 key 进行分区、排序和分组。

ASCII 图示:

Map 输出:
(Hadoop,1) (MapReduce,1) (Hadoop,1) (Big,1) (Data,1) (Hadoop,1)

│ Shuffle:分区 + 排序 + 分组

Reduce 输入:
(Hadoop, [1,1,1])
(MapReduce, [1])
(Big, [1])
(Data, [1])


4.4 Reduce 阶段合并

每个 Reduce Task 会对相同 key 的所有 value 进行聚合。

示例:

Reduce 输入Reduce 输出
(Hadoop, [1,1,1])(Hadoop, 3)
(MapReduce, [1])(MapReduce, 1)
(Big, [1])(Big, 1)
(Data, [1])(Data, 1)

4.5 最终结果

最终,Reduce 输出会被写入 HDFS 的输出目录(例如 /output),内容类似:

Big 1
Data 1
Hadoop 3
MapReduce 1


4.6 程序代码(简化版 Java)

public class WordCount {
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }
}

💡 面试补充

  • 为什么 WordCount 常被用来讲解 MapReduce?
    👉 因为它能清楚展现“Map → Shuffle → Reduce”的完整过程,逻辑简单但能涵盖核心。
  • 如果某个单词特别多,可能导致什么问题?
    👉 数据倾斜(skew),因为某个 Reduce 需要处理过多的数据,可以通过 自定义分区器Combiner 来缓解。
  • 如何优化 WordCount?
    👉 在 Map 阶段加一个 Combiner,先在本地做小范围统计,减少 Shuffle 的数据量。