MapReduce 与 Hadoop 基础
为什么需要分布式计算
单台计算机的处理能力存在物理极限。当数据规模达到 TB 甚至 PB 级别时,即使使用最强大的单机也无法在合理时间内完成处理。分布式计算通过将数据和计算任务分散到大量普通机器上并行执行,突破了单机性能瓶颈。
Google 在 2004 年发表的 MapReduce 论文(Dean & Ghemawat, OSDI 2004)提出了一种全新的分布式编程模型,让程序员无需关心分布式系统的底层细节(数据分布、任务调度、容错处理等),只需编写 Map 和 Reduce 两个函数即可完成大规模数据处理。据该论文统计,截至 2004 年底 Google 内部已编写了数百个 MapReduce 程序,每月执行数万个 MapReduce 作业。
MapReduce 的三层设计思想
MapReduce 在三个层面上解决了大规模数据处理问题:
- 分而治之:对相互间没有计算依赖关系的大数据,最自然的并行方式就是将数据划分为多个子块,分配给多个节点并行处理
- 抽象模型:借鉴函数式语言 Lisp 的思想,用 Map 和 Reduce 两个函数提供高层并行编程抽象——程序员只需描述"做什么",不需要关心"怎么做"
- 统一框架:提供统一的计算框架,自动完成数据划分、任务调度、节点通信、容错处理等系统层细节,将程序员从底层复杂性中解放出来
核心设计原则
MapReduce 的成功不仅在于编程模型,更在于一系列精心设计的系统原则:
- 向外扩展而非向上扩展(Scale out, not up):使用大量廉价的低端商用服务器构建集群,而非昂贵的高端服务器。低端服务器性价比约为高端服务器的 4 倍(不含外存则约 12 倍)
- 失效是常态(Assume failures are common):Google 在全球使用百万台以上服务器,节点失效不可避免。系统通过心跳检测、任务重调度、检查点等机制保证容错性
- 将处理移向数据(Moving processing to the data):优先在数据所在的本地节点执行计算,减少网络传输开销。当无法本地处理时,就近寻找同机架节点
- 顺序处理避免随机访问:磁盘顺序访问与随机访问性能差距巨大(100 亿条记录的数据库,更新 1% 需要 1 个月,而顺序重写仅需 1 天)
MapReduce 编程模型
核心思想
MapReduce 借鉴了函数式编程中 Lisp 语言的 Map 和 Reduce 操作:
- Map 阶段:将输入数据分割为独立的块,每个块由一个 Map 任务处理。Map 函数接收一个键值对
(k1, v1),输出一组中间键值对[(k2, v2)] - Shuffle & Sort 阶段:框架自动将相同 key 的中间结果聚合到一起,形成
(k2, [v2, v2, ...]) - Reduce 阶段:对每个 key 及其对应的所有 value 进行汇总处理,输出最终结果
(k3, v3)
graph LR
subgraph Input
A1["(k1, v1)"]
A2["(k1, v2)"]
A3["(k1, v3)"]
end
subgraph Map
B1["Map 1"]
B2["Map 2"]
B3["Map 3"]
end
subgraph Shuffle
C["Group by key"]
end
subgraph Reduce
D1["Reduce 1"]
D2["Reduce 2"]
end
subgraph Output
E1["(k2, v2)"]
E2["(k3, v3)"]
end
A1 --> B1
A2 --> B2
A3 --> B3
B1 --> C
B2 --> C
B3 --> C
C --> D1
C --> D2
D1 --> E1
D2 --> E2
WordCount 示例
WordCount 是 MapReduce 的"Hello World",展示了最基本的编程模式。
Mapper:将文本拆分为单词,为每个单词输出计数 1。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
if (!token.isEmpty()) {
word.set(token.toLowerCase());
context.write(word, ONE);
}
}
}
}
Reducer:对同一单词的所有计数求和。
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
Driver:配置 Job 参数并提交运行。
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Combiner 与 Partitioner
Combiner 本地聚合
在 Shuffle 阶段之前,Map 端可以先执行一次局部合并(Combiner),减少需要传输到 Reducer 的数据量。对于 WordCount,Combiner 可以在 Map 端先将相同单词的计数求和,将 (hello, 1), (hello, 1), (hello, 1) 合并为 (hello, 3)。
Combiner 本质上是一个本地 Reducer,但不是所有场景都适用——只有当 Reduce 操作满足交换律和结合律时(如求和、最大值),才能安全使用 Combiner。
Partitioner 数据分区
Partitioner 控制 Map 输出的中间键值对如何分配到各个 Reducer。默认的 HashPartitioner 使用 key.hashCode() % numReducers 进行分区。当需要自定义分区逻辑时(如按地域分区),可以实现自定义 Partitioner:
public class RegionPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String region = key.toString().substring(0, 2);
return (region.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
Google GFS 分布式文件系统
GFS(Google File System)是 MapReduce 的底层数据存储支撑,其设计原则深刻影响了 HDFS。
架构
GFS 采用 Master/ChunkServer 架构:
- GFS Master:保存三种元数据——命名空间(目录结构)、Chunk 与文件名的映射表、Chunk 副本位置信息。前两种通过操作日志提供容错,第三种直接保存在 ChunkServer 上
- ChunkServer:存储实际数据。每个数据块默认 64MB,每个块默认 3 个副本。数据块以本地 Linux 文件形式存储,每个块进一步划分为 64KB 的子块,每个子块有 32 位校验和
数据访问流程
- 应用程序向 GFS Master 请求文件名或数据块索引
- Master 返回数据块所在的 ChunkServer 位置信息
- 应用程序直接访问 ChunkServer 读取数据
关键特点:应用程序访问具体数据时不经过 Master,避免了 Master 成为访问瓶颈。由于大数据存储在不同 ChunkServer 中,应用程序可实现并发访问。
BigTable 结构化数据表
GFS 是文件系统,难以提供结构化数据的存储和访问管理。BigTable 在 GFS 之上提供了一个分布式多维表管理系统。
数据模型
BigTable 中的数据通过三个维度索引:
- 行关键字(Row Key):任意字节串(不超过 64KB),数据按行关键字字典序排列。URL 地址倒排存储(如
com.cnn.www)便于查找和压缩 - 列关键字(Column Key):组织为"列族"(Column Family),族中数据按列存放并压缩。列关键字格式为
族名:列名(如anchor:cnnsi.com) - 时间戳(Timestamp):同一 URL 的网页会不断更新,需要时间戳区分不同版本。支持保留最近 n 个版本或限定时间内的所有版本
查询模型:(row:string, column:string, time:int64) → 结果数据字节串
存储结构
- SSTable:BigTable 内部的基本存储结构,存储在 GFS 上,数据划分为 64KB 的子块,每个子块有独立索引便于快速查找
- Tablet(子表):大表在水平方向上被分为多个子表,每个子表由多个 SSTable 构成
- 子表寻址:采用 3 级 B+ 树索引——从 Chubby 锁服务获取 METADATA 根子表的位置,由根子表找到其他 METADATA 子表,最终获取用户子表(Tablet)所在的 TabletServer 位置
Hadoop 生态系统
HDFS 分布式文件系统
HDFS(Hadoop Distributed File System)是 Google GFS 的开源实现,是 Hadoop 的存储基础。
graph TB
Client[客户端] --> NN[NameNode]
NN --> DN1[DataNode 1]
NN --> DN2[DataNode 2]
NN --> DN3[DataNode 3]
subgraph "HDFS 集群"
NN
DN1 --- B1[Block 1]
DN1 --- B2[Block 2]
DN2 --- B3[Block 2 replica]
DN2 --- B4[Block 3]
DN3 --- B5[Block 1 replica]
DN3 --- B6[Block 3 replica]
end
核心设计:
- 分块存储:文件被切分为固定大小的块(Hadoop 1.x 默认 64MB,Hadoop 2.x/3.x 默认 128MB),分布存储在集群中的不同节点上
- 多副本冗余:每个块默认保存 3 个副本,按块随机选择存储节点,支持机架感知(Rack Awareness)策略
- NameNode 元数据管理:NameNode 维护文件系统的命名空间和块的位置信息,通过心跳检测 DataNode 状态
- 顺序读优化:HDFS 对顺序读进行了优化,支持大量数据的快速顺序读出。数据支持一次写入、多次读取,不支持已写入数据的更新(允许在文件尾部追加)
- 数据本地性:MapReduce 优先将计算任务调度到数据所在的节点上执行,减少网络传输
YARN 资源管理
YARN(Yet Another Resource Negotiator)是 Hadoop 2.x 引入的资源管理框架,将资源管理和作业调度分离:
- ResourceManager:全局资源管理器,负责整个集群的资源分配
- NodeManager:每个节点上的代理,管理本节点的资源和容器
- ApplicationMaster:每个应用程序的管理进程,负责任务调度和监控
MapReduce 执行流程
一个 MapReduce 作业的完整执行过程:
- 输入分片:
InputFormat将输入文件划分为多个InputSplit,每个 split 由一个 Map 任务处理 - 记录读取:
RecordReader将 split 中的记录逐条读取并转换为键值对 - Map 处理:Mapper 处理每个键值对,输出中间结果
- Combine(可选):Map 端本地聚合
- Partition & Shuffle:中间结果按 key 分区并传输到对应的 Reducer
- Sort:Reducer 端对收到的数据按键排序
- Reduce 处理:Reducer 对每个 key 及其 value 列表进行处理
- 输出:
OutputFormat将结果写入 HDFS
MapReduce 的局限性
尽管 MapReduce 在大数据处理领域取得了巨大成功,但它也有明显的局限:
- 不适合迭代计算:每次迭代都需要读写磁盘,开销巨大。例如机器学习算法(如 K-Means、PageRank)需要多次迭代,每轮迭代都要将中间结果写入 HDFS 再读出。后来 Spark 通过将中间数据缓存在内存中(RDD 抽象)解决了这个问题,对迭代工作负载可获得高达 10-100 倍的加速
- 不适合实时处理:MapReduce 是批处理框架,作业启动和调度的开销导致延迟较高(通常秒级到分钟级),不适合需要毫秒级响应的实时场景
- 编程模型受限:所有计算都必须表达为 Map 和 Reduce 两级操作,复杂算法(如多阶段数据管道、图算法)难以自然表达
- Shuffle 开销大:Map 阶段的输出需要通过网络传输到 Reduce 节点,大量数据在网络中传输成为性能瓶颈
这些局限性催生了新一代大数据处理框架的诞生:Spark(2014)通过内存计算和更丰富的 API 解决了迭代计算和编程灵活性问题;Flink(2014)通过真正的流处理解决了实时性问题。