MapReduce 算法设计模式
MapReduce 不仅是一个编程框架,更催生了一系列经典的数据处理算法设计模式。本章介绍几个典型的 MapReduce 算法,展示如何将复杂的数据处理任务转化为 Map 和 Reduce 操作。
TeraSort 全局排序
问题背景
2008 年,Hadoop 在 910 个节点的集群上用 209 秒完成了 1TB 数据(100 亿条记录)的排序,赢得了年度 TB 级排序基准测试。2009 年,Yahoo 将这一记录缩短到 62 秒。TeraSort 的核心挑战在于:如何将海量数据均匀分配到多个 Reducer 上进行排序,避免数据倾斜。
MapReduce 排序的基本思路
MapReduce 框架自带排序机制——Shuffle 阶段会自动对中间结果按键排序。因此,最简单的排序策略是:
map(k, v) → (k, v) // Identity Mapper,直接输出
shuffle and sort // 框架自动按键排序
reduce(k, [v]) → (k, v) // Identity Reducer,直接输出
但问题在于:默认的 HashPartitioner 只保证相同 key 落到同一 Reducer,不保证全局有序。如果 key 分布不均匀,某些 Reducer 会收到过多数据,成为性能瓶颈。
TotalOrderPartitioner
Hadoop 提供的 TotalOrderPartitioner 解决了上述问题,其核心思想是采样 + 均匀划分:
- 采样:从输入数据中随机抽取一小部分 key
- 排序分割:对采样数据排序后,等分为 N 份(N 为 Reducer 数量),得到 N-1 个分割点
- 区间划分:每个 Reducer 处理一个 key 区间,保证 Reducer i 的所有 key 小于 Reducer i+1 的所有 key
例如,设 Reducer 数为 3,采样得到 9 个 key:1, 22, 55, 60, 62, 66, 68, 70, 90,则两个分割点为 60 和 68,划分区间为 (-∞, 60)、[60, 68)、[68, +∞)。
Trie 树优化
当 key 类型为 BinaryComparable(如 Text)时,TotalOrderPartitioner 使用两级 Trie 树来高效定位 key 所属区间。Trie 树基于 key 的前两个字节构建,最多可索引 256×256 = 65536 个 Reducer,足以满足大多数场景。查找时间复杂度为 O(key 长度),远优于二分查找。
使用方式
Job job = Job.getInstance(conf);
job.setPartitionerClass(TotalOrderPartitioner.class);
// 采样生成分割点文件
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<>(0.01, 10000);
InputSampler.writePartitionFile(job, sampler);
// 设置分割文件路径
TotalOrderPartitioner.setPartitionFile(job,
new Path("/tmp/partition.lst"));
单词同现矩阵
问题定义
单词同现矩阵是一个 N×N 的二维矩阵(N 为词汇量),矩阵元素 M[i,j] 表示单词 Wi 和 Wj 在一定窗口范围内共同出现的次数。同现矩阵是许多文本分析算法的基础,如词义消歧、主题模型等。
Pairs 方法
最直观的方法是让 Mapper 为每个单词与其窗口内的邻居生成一个 (单词对, 1) 的键值对:
Mapper 伪代码:
for each term w in document:
for each neighbor u of w (within window):
emit((w, u), 1)
Reducer 伪代码:
for each pair (w, u):
s = sum of all counts
emit((w, u), s)
示例:语料为 "we are not what we want to be",窗口大小为 2。
Mapper 输出:
(we, are) → 1
(we, not) → 1
(are, we) → 1
(are, not) → 1
(are, what) → 1
(not, are) → 1
(not, what) → 1
...
Reducer 汇总后得到同现次数,最终构成矩阵。
Combiner 优化
Pairs 方法会产生大量中间键值对。可以在 Mapper 端使用 Combiner 进行局部聚合,减少网络传输量。由于同现计数满足结合律(求和),Combiner 可以安全使用。
实现代码
public class CoOccurrenceMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text pair = new Text();
private int windowSize = 2;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (int i = 0; i < words.length; i++) {
for (int j = i + 1; j < Math.min(i + windowSize + 1, words.length); j++) {
pair.set(words[i] + "," + words[j]);
context.write(pair, ONE);
// 如果是无向同现,还需输出反向
pair.set(words[j] + "," + words[i]);
context.write(pair, ONE);
}
}
}
}
public class CoOccurrenceReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
Stripes 方法
Pairs 方法的替代方案是 Stripes 方法:Mapper 为每个单词生成一个局部的哈希映射(Map<邻居词, 计数>),以单词本身为 key,整个映射为 value。Reducer 合并同一单词的所有映射。Stripes 方法减少了中间键值对的数量,但 value 更大,且需要自定义 Writable 类型。
文档倒排索引
基本概念
倒排索引(Inverted Index)是搜索引擎的核心数据结构。给定一个词(term),能快速获取包含该词的文档列表。搜索引擎的工作流程分为三步:
- 爬取(Crawling):收集网页内容
- 索引(Indexing):构建倒排索引(离线)
- 检索(Retrieval):根据查询返回结果(在线)
简单倒排索引
最简单的倒排索引只记录每个词出现在哪些文档中:
输入:
doc1: "one fish two fish"
doc2: "red fish blue fish"
doc3: "one red bird"
输出:
one → doc1, doc3
fish → doc1, doc2
two → doc1
red → doc2, doc3
blue → doc2
bird → doc3
Mapper:读取每个文档,对每个词输出 (词, 文档名#行偏移),记录词出现的位置信息。
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
Text word = new Text();
Text fileInfo = new Text(fileName + "#" + key.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, fileInfo);
}
}
}
Reducer:收集同一词的所有文档名,拼接为列表。
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text val : values) {
if (sb.length() > 0) sb.append(", ");
sb.append(val.toString());
}
context.write(key, new Text(sb.toString()));
}
}
带词频的倒排索引
实际的搜索引擎需要记录更多属性(词频、位置等),称为 Payload。改进后的 Mapper 输出 (term, docid) 作为 key,词频作为 value:
// Map 输出: (term, docid) → tf
// 自定义 Partitioner 确保同一 term 路由到同一 Reducer
public class TermPartitioner extends HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String term = key.toString().split(",")[0];
return super.getPartition(new Text(term), value, numPartitions);
}
}
这种"值转键"(value-to-key conversion)技巧让 MapReduce 框架自动完成排序,避免 Reducer 内存溢出。自定义 Partitioner 确保同一 term 的所有 (term, docid) 对被路由到同一个 Reducer。
专利引用分析
数据集
美国专利引用数据集包含两个文件:
cite75_99.txt:引用关系,每行格式为CITING,CITED(引用专利号, 被引专利号)apat63_99.txt:专利描述信息,包含专利号、年份、国家、类别等 20+ 个字段
专利被引列表
将引用关系倒排,生成每个专利被哪些专利引用的列表:
public class CitationMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
context.write(new Text(parts[1]), new Text(parts[0])); // cited → citing
}
}
public class CitationReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text val : values) {
if (sb.length() > 0) sb.append(",");
sb.append(val.toString());
}
context.write(key, new Text(sb.toString()));
}
}
输出示例:1000046 → 5918892,5525001,5609991(专利 1000046 被 3 个专利引用)。
专利被引次数统计
经典的 WordCount 模式,统计每个专利被引用了多少次:
public class CitationCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
context.write(new Text(parts[1]), ONE); // cited → 1
}
}
Reducer 累加即可得到每个专利的被引次数。
被引次数分布统计
对被引次数再次进行 WordCount,可以得到"被引 k 次的专利有多少个"的分布直方图。这个两级 MapReduce 管道展示了如何通过串联多个 Job 实现复杂的数据分析流程。
年度/国家专利统计
从专利描述数据中提取特定字段进行统计:
public class YearPatentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
String year = cols[1]; // GYEAR 列
context.write(new Text(year), ONE);
}
}
通过修改列索引,同样的框架可以统计不同国家的专利数量、不同类别的专利数量等。
设计模式总结
| 模式 | 核心思想 | 典型应用 |
|---|---|---|
| 全序分区 | 采样 + 均匀划分,保证全局有序 | TeraSort |
| Pairs | 为每对元素生成键值对 | 同现矩阵、关联分析 |
| Stripes | 每个元素附带局部聚合结果 | 同现矩阵(更高效) |
| 值转键 | 将 value 中的信息合并到 key 中,利用框架排序 | 带词频的倒排索引 |
| 自定义 Partitioner | 控制 key 的路由策略 | 复合 key 场景 |
| 链式 Job | 多个 MapReduce 串联 | 多级统计分析 |
这些设计模式构成了 MapReduce 算法设计的基础工具箱。掌握这些模式后,面对新的数据处理问题时,可以快速将其映射到 MapReduce 编程模型中。