Skip to content

MapReduce 与 Hadoop 基础

为什么需要分布式计算

单台计算机的处理能力存在物理极限。当数据规模达到 TB 甚至 PB 级别时,即使使用最强大的单机也无法在合理时间内完成处理。分布式计算通过将数据和计算任务分散到大量普通机器上并行执行,突破了单机性能瓶颈。

Google 在 2004 年发表的 MapReduce 论文(Dean & Ghemawat, OSDI 2004)提出了一种全新的分布式编程模型,让程序员无需关心分布式系统的底层细节(数据分布、任务调度、容错处理等),只需编写 Map 和 Reduce 两个函数即可完成大规模数据处理。据该论文统计,截至 2004 年底 Google 内部已编写了数百个 MapReduce 程序,每月执行数万个 MapReduce 作业。

MapReduce 的三层设计思想

MapReduce 在三个层面上解决了大规模数据处理问题:

  1. 分而治之:对相互间没有计算依赖关系的大数据,最自然的并行方式就是将数据划分为多个子块,分配给多个节点并行处理
  2. 抽象模型:借鉴函数式语言 Lisp 的思想,用 Map 和 Reduce 两个函数提供高层并行编程抽象——程序员只需描述"做什么",不需要关心"怎么做"
  3. 统一框架:提供统一的计算框架,自动完成数据划分、任务调度、节点通信、容错处理等系统层细节,将程序员从底层复杂性中解放出来

核心设计原则

MapReduce 的成功不仅在于编程模型,更在于一系列精心设计的系统原则:

  • 向外扩展而非向上扩展(Scale out, not up):使用大量廉价的低端商用服务器构建集群,而非昂贵的高端服务器。低端服务器性价比约为高端服务器的 4 倍(不含外存则约 12 倍)
  • 失效是常态(Assume failures are common):Google 在全球使用百万台以上服务器,节点失效不可避免。系统通过心跳检测、任务重调度、检查点等机制保证容错性
  • 将处理移向数据(Moving processing to the data):优先在数据所在的本地节点执行计算,减少网络传输开销。当无法本地处理时,就近寻找同机架节点
  • 顺序处理避免随机访问:磁盘顺序访问与随机访问性能差距巨大(100 亿条记录的数据库,更新 1% 需要 1 个月,而顺序重写仅需 1 天)

MapReduce 编程模型

核心思想

MapReduce 借鉴了函数式编程中 Lisp 语言的 Map 和 Reduce 操作:

  1. Map 阶段:将输入数据分割为独立的块,每个块由一个 Map 任务处理。Map 函数接收一个键值对 (k1, v1),输出一组中间键值对 [(k2, v2)]
  2. Shuffle & Sort 阶段:框架自动将相同 key 的中间结果聚合到一起,形成 (k2, [v2, v2, ...])
  3. Reduce 阶段:对每个 key 及其对应的所有 value 进行汇总处理,输出最终结果 (k3, v3)
graph LR
    subgraph Input
        A1["(k1, v1)"]
        A2["(k1, v2)"]
        A3["(k1, v3)"]
    end

    subgraph Map
        B1["Map 1"]
        B2["Map 2"]
        B3["Map 3"]
    end

    subgraph Shuffle
        C["Group by key"]
    end

    subgraph Reduce
        D1["Reduce 1"]
        D2["Reduce 2"]
    end

    subgraph Output
        E1["(k2, v2)"]
        E2["(k3, v3)"]
    end

    A1 --> B1
    A2 --> B2
    A3 --> B3
    B1 --> C
    B2 --> C
    B3 --> C
    C --> D1
    C --> D2
    D1 --> E1
    D2 --> E2

WordCount 示例

WordCount 是 MapReduce 的"Hello World",展示了最基本的编程模式。

Mapper:将文本拆分为单词,为每个单词输出计数 1。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable ONE = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] tokens = value.toString().split("\\s+");
        for (String token : tokens) {
            if (!token.isEmpty()) {
                word.set(token.toLowerCase());
                context.write(word, ONE);
            }
        }
    }
}

Reducer:对同一单词的所有计数求和。

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

Driver:配置 Job 参数并提交运行。

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCount");
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Combiner 与 Partitioner

Combiner 本地聚合

在 Shuffle 阶段之前,Map 端可以先执行一次局部合并(Combiner),减少需要传输到 Reducer 的数据量。对于 WordCount,Combiner 可以在 Map 端先将相同单词的计数求和,将 (hello, 1), (hello, 1), (hello, 1) 合并为 (hello, 3)

job.setCombinerClass(WordCountReducer.class);

Combiner 本质上是一个本地 Reducer,但不是所有场景都适用——只有当 Reduce 操作满足交换律和结合律时(如求和、最大值),才能安全使用 Combiner。

Partitioner 数据分区

Partitioner 控制 Map 输出的中间键值对如何分配到各个 Reducer。默认的 HashPartitioner 使用 key.hashCode() % numReducers 进行分区。当需要自定义分区逻辑时(如按地域分区),可以实现自定义 Partitioner:

public class RegionPartitioner extends Partitioner<Text, Text> {
    @Override
    public int getPartition(Text key, Text value, int numPartitions) {
        String region = key.toString().substring(0, 2);
        return (region.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

Google GFS 分布式文件系统

GFS(Google File System)是 MapReduce 的底层数据存储支撑,其设计原则深刻影响了 HDFS。

架构

GFS 采用 Master/ChunkServer 架构:

  • GFS Master:保存三种元数据——命名空间(目录结构)、Chunk 与文件名的映射表、Chunk 副本位置信息。前两种通过操作日志提供容错,第三种直接保存在 ChunkServer 上
  • ChunkServer:存储实际数据。每个数据块默认 64MB,每个块默认 3 个副本。数据块以本地 Linux 文件形式存储,每个块进一步划分为 64KB 的子块,每个子块有 32 位校验和

数据访问流程

  1. 应用程序向 GFS Master 请求文件名或数据块索引
  2. Master 返回数据块所在的 ChunkServer 位置信息
  3. 应用程序直接访问 ChunkServer 读取数据

关键特点:应用程序访问具体数据时不经过 Master,避免了 Master 成为访问瓶颈。由于大数据存储在不同 ChunkServer 中,应用程序可实现并发访问。

BigTable 结构化数据表

GFS 是文件系统,难以提供结构化数据的存储和访问管理。BigTable 在 GFS 之上提供了一个分布式多维表管理系统。

数据模型

BigTable 中的数据通过三个维度索引:

  • 行关键字(Row Key):任意字节串(不超过 64KB),数据按行关键字字典序排列。URL 地址倒排存储(如 com.cnn.www)便于查找和压缩
  • 列关键字(Column Key):组织为"列族"(Column Family),族中数据按列存放并压缩。列关键字格式为 族名:列名(如 anchor:cnnsi.com
  • 时间戳(Timestamp):同一 URL 的网页会不断更新,需要时间戳区分不同版本。支持保留最近 n 个版本或限定时间内的所有版本

查询模型:(row:string, column:string, time:int64) → 结果数据字节串

存储结构

  • SSTable:BigTable 内部的基本存储结构,存储在 GFS 上,数据划分为 64KB 的子块,每个子块有独立索引便于快速查找
  • Tablet(子表):大表在水平方向上被分为多个子表,每个子表由多个 SSTable 构成
  • 子表寻址:采用 3 级 B+ 树索引——从 Chubby 锁服务获取 METADATA 根子表的位置,由根子表找到其他 METADATA 子表,最终获取用户子表(Tablet)所在的 TabletServer 位置

Hadoop 生态系统

HDFS 分布式文件系统

HDFS(Hadoop Distributed File System)是 Google GFS 的开源实现,是 Hadoop 的存储基础。

graph TB
    Client[客户端] --> NN[NameNode]
    NN --> DN1[DataNode 1]
    NN --> DN2[DataNode 2]
    NN --> DN3[DataNode 3]

    subgraph "HDFS 集群"
        NN
        DN1 --- B1[Block 1]
        DN1 --- B2[Block 2]
        DN2 --- B3[Block 2 replica]
        DN2 --- B4[Block 3]
        DN3 --- B5[Block 1 replica]
        DN3 --- B6[Block 3 replica]
    end

核心设计

  • 分块存储:文件被切分为固定大小的块(Hadoop 1.x 默认 64MB,Hadoop 2.x/3.x 默认 128MB),分布存储在集群中的不同节点上
  • 多副本冗余:每个块默认保存 3 个副本,按块随机选择存储节点,支持机架感知(Rack Awareness)策略
  • NameNode 元数据管理:NameNode 维护文件系统的命名空间和块的位置信息,通过心跳检测 DataNode 状态
  • 顺序读优化:HDFS 对顺序读进行了优化,支持大量数据的快速顺序读出。数据支持一次写入、多次读取,不支持已写入数据的更新(允许在文件尾部追加)
  • 数据本地性:MapReduce 优先将计算任务调度到数据所在的节点上执行,减少网络传输

YARN 资源管理

YARN(Yet Another Resource Negotiator)是 Hadoop 2.x 引入的资源管理框架,将资源管理和作业调度分离:

  • ResourceManager:全局资源管理器,负责整个集群的资源分配
  • NodeManager:每个节点上的代理,管理本节点的资源和容器
  • ApplicationMaster:每个应用程序的管理进程,负责任务调度和监控

MapReduce 执行流程

一个 MapReduce 作业的完整执行过程:

  1. 输入分片InputFormat 将输入文件划分为多个 InputSplit,每个 split 由一个 Map 任务处理
  2. 记录读取RecordReader 将 split 中的记录逐条读取并转换为键值对
  3. Map 处理:Mapper 处理每个键值对,输出中间结果
  4. Combine(可选):Map 端本地聚合
  5. Partition & Shuffle:中间结果按 key 分区并传输到对应的 Reducer
  6. Sort:Reducer 端对收到的数据按键排序
  7. Reduce 处理:Reducer 对每个 key 及其 value 列表进行处理
  8. 输出OutputFormat 将结果写入 HDFS

MapReduce 的局限性

尽管 MapReduce 在大数据处理领域取得了巨大成功,但它也有明显的局限:

  • 不适合迭代计算:每次迭代都需要读写磁盘,开销巨大。例如机器学习算法(如 K-Means、PageRank)需要多次迭代,每轮迭代都要将中间结果写入 HDFS 再读出。后来 Spark 通过将中间数据缓存在内存中(RDD 抽象)解决了这个问题,对迭代工作负载可获得高达 10-100 倍的加速
  • 不适合实时处理:MapReduce 是批处理框架,作业启动和调度的开销导致延迟较高(通常秒级到分钟级),不适合需要毫秒级响应的实时场景
  • 编程模型受限:所有计算都必须表达为 Map 和 Reduce 两级操作,复杂算法(如多阶段数据管道、图算法)难以自然表达
  • Shuffle 开销大:Map 阶段的输出需要通过网络传输到 Reduce 节点,大量数据在网络中传输成为性能瓶颈

这些局限性催生了新一代大数据处理框架的诞生:Spark(2014)通过内存计算和更丰富的 API 解决了迭代计算和编程灵活性问题;Flink(2014)通过真正的流处理解决了实时性问题。