Skip to content

MapReduce 算法设计模式

MapReduce 不仅是一个编程框架,更催生了一系列经典的数据处理算法设计模式。本章介绍几个典型的 MapReduce 算法,展示如何将复杂的数据处理任务转化为 Map 和 Reduce 操作。

TeraSort 全局排序

问题背景

2008 年,Hadoop 在 910 个节点的集群上用 209 秒完成了 1TB 数据(100 亿条记录)的排序,赢得了年度 TB 级排序基准测试。2009 年,Yahoo 将这一记录缩短到 62 秒。TeraSort 的核心挑战在于:如何将海量数据均匀分配到多个 Reducer 上进行排序,避免数据倾斜。

MapReduce 排序的基本思路

MapReduce 框架自带排序机制——Shuffle 阶段会自动对中间结果按键排序。因此,最简单的排序策略是:

map(k, v) → (k, v)       // Identity Mapper,直接输出
shuffle and sort           // 框架自动按键排序
reduce(k, [v]) → (k, v)   // Identity Reducer,直接输出

但问题在于:默认的 HashPartitioner 只保证相同 key 落到同一 Reducer,不保证全局有序。如果 key 分布不均匀,某些 Reducer 会收到过多数据,成为性能瓶颈。

TotalOrderPartitioner

Hadoop 提供的 TotalOrderPartitioner 解决了上述问题,其核心思想是采样 + 均匀划分

  1. 采样:从输入数据中随机抽取一小部分 key
  2. 排序分割:对采样数据排序后,等分为 N 份(N 为 Reducer 数量),得到 N-1 个分割点
  3. 区间划分:每个 Reducer 处理一个 key 区间,保证 Reducer i 的所有 key 小于 Reducer i+1 的所有 key

例如,设 Reducer 数为 3,采样得到 9 个 key:1, 22, 55, 60, 62, 66, 68, 70, 90,则两个分割点为 60 和 68,划分区间为 (-∞, 60)[60, 68)[68, +∞)

Trie 树优化

当 key 类型为 BinaryComparable(如 Text)时,TotalOrderPartitioner 使用两级 Trie 树来高效定位 key 所属区间。Trie 树基于 key 的前两个字节构建,最多可索引 256×256 = 65536 个 Reducer,足以满足大多数场景。查找时间复杂度为 O(key 长度),远优于二分查找。

使用方式

Job job = Job.getInstance(conf);
job.setPartitionerClass(TotalOrderPartitioner.class);

// 采样生成分割点文件
InputSampler.Sampler<IntWritable, Text> sampler =
    new InputSampler.RandomSampler<>(0.01, 10000);
InputSampler.writePartitionFile(job, sampler);

// 设置分割文件路径
TotalOrderPartitioner.setPartitionFile(job,
    new Path("/tmp/partition.lst"));

单词同现矩阵

问题定义

单词同现矩阵是一个 N×N 的二维矩阵(N 为词汇量),矩阵元素 M[i,j] 表示单词 Wi 和 Wj 在一定窗口范围内共同出现的次数。同现矩阵是许多文本分析算法的基础,如词义消歧、主题模型等。

Pairs 方法

最直观的方法是让 Mapper 为每个单词与其窗口内的邻居生成一个 (单词对, 1) 的键值对:

Mapper 伪代码:
for each term w in document:
    for each neighbor u of w (within window):
        emit((w, u), 1)

Reducer 伪代码:
for each pair (w, u):
    s = sum of all counts
    emit((w, u), s)

示例:语料为 "we are not what we want to be",窗口大小为 2。

Mapper 输出:

(we, are) → 1
(we, not) → 1
(are, we) → 1
(are, not) → 1
(are, what) → 1
(not, are) → 1
(not, what) → 1
...

Reducer 汇总后得到同现次数,最终构成矩阵。

Combiner 优化

Pairs 方法会产生大量中间键值对。可以在 Mapper 端使用 Combiner 进行局部聚合,减少网络传输量。由于同现计数满足结合律(求和),Combiner 可以安全使用。

实现代码

public class CoOccurrenceMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);
    private Text pair = new Text();
    private int windowSize = 2;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] words = value.toString().split("\\s+");
        for (int i = 0; i < words.length; i++) {
            for (int j = i + 1; j < Math.min(i + windowSize + 1, words.length); j++) {
                pair.set(words[i] + "," + words[j]);
                context.write(pair, ONE);
                // 如果是无向同现,还需输出反向
                pair.set(words[j] + "," + words[i]);
                context.write(pair, ONE);
            }
        }
    }
}

public class CoOccurrenceReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

Stripes 方法

Pairs 方法的替代方案是 Stripes 方法:Mapper 为每个单词生成一个局部的哈希映射(Map<邻居词, 计数>),以单词本身为 key,整个映射为 value。Reducer 合并同一单词的所有映射。Stripes 方法减少了中间键值对的数量,但 value 更大,且需要自定义 Writable 类型。

文档倒排索引

基本概念

倒排索引(Inverted Index)是搜索引擎的核心数据结构。给定一个词(term),能快速获取包含该词的文档列表。搜索引擎的工作流程分为三步:

  1. 爬取(Crawling):收集网页内容
  2. 索引(Indexing):构建倒排索引(离线)
  3. 检索(Retrieval):根据查询返回结果(在线)

简单倒排索引

最简单的倒排索引只记录每个词出现在哪些文档中:

输入:
  doc1: "one fish two fish"
  doc2: "red fish blue fish"
  doc3: "one red bird"

输出:
  one  → doc1, doc3
  fish → doc1, doc2
  two  → doc1
  red  → doc2, doc3
  blue → doc2
  bird → doc3

Mapper:读取每个文档,对每个词输出 (词, 文档名#行偏移),记录词出现的位置信息。

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        Text word = new Text();
        Text fileInfo = new Text(fileName + "#" + key.toString());
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, fileInfo);
        }
    }
}

Reducer:收集同一词的所有文档名,拼接为列表。

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text val : values) {
            if (sb.length() > 0) sb.append(", ");
            sb.append(val.toString());
        }
        context.write(key, new Text(sb.toString()));
    }
}

带词频的倒排索引

实际的搜索引擎需要记录更多属性(词频、位置等),称为 Payload。改进后的 Mapper 输出 (term, docid) 作为 key,词频作为 value:

// Map 输出: (term, docid) → tf
// 自定义 Partitioner 确保同一 term 路由到同一 Reducer
public class TermPartitioner extends HashPartitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        String term = key.toString().split(",")[0];
        return super.getPartition(new Text(term), value, numPartitions);
    }
}

这种"值转键"(value-to-key conversion)技巧让 MapReduce 框架自动完成排序,避免 Reducer 内存溢出。自定义 Partitioner 确保同一 term 的所有 (term, docid) 对被路由到同一个 Reducer。

专利引用分析

数据集

美国专利引用数据集包含两个文件:

  • cite75_99.txt:引用关系,每行格式为 CITING,CITED(引用专利号, 被引专利号)
  • apat63_99.txt:专利描述信息,包含专利号、年份、国家、类别等 20+ 个字段

专利被引列表

将引用关系倒排,生成每个专利被哪些专利引用的列表:

public class CitationMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] parts = value.toString().split(",");
        context.write(new Text(parts[1]), new Text(parts[0]));  // cited → citing
    }
}

public class CitationReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text val : values) {
            if (sb.length() > 0) sb.append(",");
            sb.append(val.toString());
        }
        context.write(key, new Text(sb.toString()));
    }
}

输出示例:1000046 → 5918892,5525001,5609991(专利 1000046 被 3 个专利引用)。

专利被引次数统计

经典的 WordCount 模式,统计每个专利被引用了多少次:

public class CitationCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] parts = value.toString().split(",");
        context.write(new Text(parts[1]), ONE);  // cited → 1
    }
}

Reducer 累加即可得到每个专利的被引次数。

被引次数分布统计

对被引次数再次进行 WordCount,可以得到"被引 k 次的专利有多少个"的分布直方图。这个两级 MapReduce 管道展示了如何通过串联多个 Job 实现复杂的数据分析流程。

年度/国家专利统计

从专利描述数据中提取特定字段进行统计:

public class YearPatentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] cols = value.toString().split(",");
        String year = cols[1];   // GYEAR 列
        context.write(new Text(year), ONE);
    }
}

通过修改列索引,同样的框架可以统计不同国家的专利数量、不同类别的专利数量等。

设计模式总结

模式 核心思想 典型应用
全序分区 采样 + 均匀划分,保证全局有序 TeraSort
Pairs 为每对元素生成键值对 同现矩阵、关联分析
Stripes 每个元素附带局部聚合结果 同现矩阵(更高效)
值转键 将 value 中的信息合并到 key 中,利用框架排序 带词频的倒排索引
自定义 Partitioner 控制 key 的路由策略 复合 key 场景
链式 Job 多个 MapReduce 串联 多级统计分析

这些设计模式构成了 MapReduce 算法设计的基础工具箱。掌握这些模式后,面对新的数据处理问题时,可以快速将其映射到 MapReduce 编程模型中。