高级 MapReduce 编程
基础的 MapReduce 编程模型虽然简洁,但面对复杂的实际需求时,往往需要更高级的技术手段。本章介绍 Combiner 优化、自定义数据类型、复合键排序、Join 操作、链式处理等高级编程技术。
Combiner 局部聚合
原理
Combiner 在 Map 端执行一次局部 Reduce 操作,减少 Shuffle 阶段需要传输的数据量。对于 WordCount,如果一个 Map 任务产生了 1000 个 (hello, 1),Combiner 可以在本地将其合并为 (hello, 1000),将网络传输量减少 1000 倍。
使用限制
Combiner 只能在 Reduce 操作满足交换律和结合律时安全使用。以下场景适用:
- 求和(SUM)、计数(COUNT)
- 求最大值(MAX)、最小值(MIN)
以下场景不适用:
- 求平均值(AVERAGE)——
(2+4)/2 ≠ (2/2 + 4/2)的中间结果无法正确合并 - 中位数(MEDIAN)
自定义数据类型
Writable 接口
Hadoop 的序列化机制基于 Writable 接口。当 Map 或 Reduce 需要输出自定义类型时,必须实现该接口:
public class TemperaturePair implements WritableComparable<TemperaturePair> {
private int year;
private double temperature;
public TemperaturePair() {}
public TemperaturePair(int year, double temperature) {
this.year = year;
this.temperature = temperature;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeDouble(temperature);
}
@Override
public void readFields(DataInput in) throws IOException {
year = in.readInt();
temperature = in.readDouble();
}
@Override
public int compareTo(TemperaturePair other) {
int cmp = Integer.compare(this.year, other.year);
if (cmp != 0) return cmp;
return Double.compare(this.temperature, other.temperature);
}
// hashCode() 和 equals() 也需要实现
}
复合键排序(Secondary Sort)
问题场景
假设我们有气象数据 (年份, 温度),需要找出每年的最高温度。MapReduce 框架默认只按 key 排序,value 列表的顺序是不确定的。如果我们希望同一个年份的温度按降序排列,就需要用到 Secondary Sort。
实现策略
- 复合键:将
(年份, 温度)组合为一个复合 key - 自定义分区器:保证同年份的数据路由到同一 Reducer
- 自定义排序:先按年份升序,同年份内按温度降序
- 自定义分组器:保证同年份的所有记录进入同一次 reduce 调用
// 自定义分组比较器:只按年份分组
public class YearGroupComparator extends WritableComparator {
protected YearGroupComparator() {
super(TemperaturePair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TemperaturePair pa = (TemperaturePair) a;
TemperaturePair pb = (TemperaturePair) b;
return Integer.compare(pa.getYear(), pb.getYear());
}
}
// Job 配置
job.setPartitionerClass(YearPartitioner.class); // 按年份分区
job.setSortComparatorClass(YearTempComparator.class); // 复合排序
job.setGroupingComparatorClass(YearGroupComparator.class); // 按年份分组
Join 操作
Reduce Side Join
最通用的 Join 方式,适用于任意大小的数据集。核心思想是在 Map 阶段为每条记录打上来源标签,Reduce 阶段根据标签将不同数据源的记录配对。
// Map 阶段:为每条记录添加来源标签
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
private String tag; // "customer" 或 "order"
@Override
protected void setup(Context context) {
tag = context.getConfiguration().get("data.source");
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
outKey.set(parts[0]); // join key (如 customer_id)
outValue.set(tag + "," + value.toString());
context.write(outKey, outValue);
}
}
// Reduce 阶段:根据标签配对
public class JoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> customers = new ArrayList<>();
List<String> orders = new ArrayList<>();
for (Text val : values) {
String[] parts = val.toString().split(",", 2);
if (parts[0].equals("customer")) {
customers.add(parts[1]);
} else {
orders.add(parts[1]);
}
}
// 笛卡尔积连接
for (String c : customers) {
for (String o : orders) {
context.write(key, new Text(c + "," + o));
}
}
}
}
Reduce Side Join 的缺点是所有数据都需要经过 Shuffle,网络开销大。
Map Side Join(Distributed Cache)
当一个数据集较小(能放入内存)时,可以将其分发到所有 Map 节点,在 Map 端直接完成 Join,避免 Shuffle。
// 在 Driver 中将小表加入分布式缓存
job.addCacheFile(new URI("/data/small_table.txt"));
// 在 Mapper 的 setup 阶段读取小表到内存
@Override
protected void setup(Context context) throws IOException {
Map<String, String> smallTable = new HashMap<>();
URI[] cacheFiles = context.getCacheFiles();
BufferedReader reader = new BufferedReader(
new FileReader(new File(cacheFiles[0].getPath())));
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
smallTable.put(parts[0], line);
}
reader.close();
}
// 在 map 方法中直接查找并连接
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
String joinKey = parts[0];
String smallRecord = smallTable.get(joinKey);
if (smallRecord != null) {
context.write(new Text(joinKey),
new Text(value.toString() + "," + smallRecord));
}
}
Map Side Join 没有 Shuffle 开销,性能远优于 Reduce Side Join。
链式 MapReduce
ChainMapper / ChainReducer
当需要对数据进行多步处理时,可以使用 ChainMapper 和 ChainReducer 将多个 Map 和 Reduce 串联在一个 Job 中:
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 第一个 Mapper
ChainMapper.addMapper(job, CleanMapper.class,
LongWritable.class, Text.class, Text.class, Text.class, conf);
// Reducer
ChainReducer.setReducer(job, AggregateReducer.class,
Text.class, Text.class, Text.class, Text.class, conf);
// Reducer 之后的 Mapper(后处理)
ChainReducer.addMapper(job, FormatMapper.class,
Text.class, Text.class, Text.class, Text.class, conf);
链式处理减少了 Job 之间的磁盘 I/O,但增加了单个 Job 的复杂度。
DAG 执行控制
JobControl
当多个 MapReduce Job 之间存在依赖关系(Job B 需要 Job A 的输出作为输入)时,可以使用 JobControl 管理 DAG(有向无环图)执行:
JobControl control = new JobControl("pipeline");
ControlledJob jobA = new ControlledJob(conf1);
ControlledJob jobB = new ControlledJob(conf2);
ControlledJob jobC = new ControlledJob(conf3);
// jobB 依赖 jobA,jobC 依赖 jobB
jobB.addDependingJob(jobA);
jobC.addDependingJob(jobB);
control.addJob(jobA);
control.addJob(jobB);
control.addJob(jobC);
// 在新线程中运行
Thread runner = new Thread(control);
runner.start();
while (!control.allFinished()) {
Thread.sleep(5000);
}
control.stop();
MultipleOutputFormat
默认情况下,Reducer 的所有输出写入同一个文件。使用 MultipleOutputFormat 可以将不同类型的输出写入不同文件:
MultipleOutputs.addNamedOutput(job, "good",
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "bad",
TextOutputFormat.class, Text.class, Text.class);
// 在 Reducer 中
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) {
MultipleOutputs mos = new MultipleOutputs(context);
if (isGoodData(key)) {
mos.write("good", key, value);
} else {
mos.write("bad", key, value);
}
}
DBInputFormat / DBOutputFormat
当需要从关系数据库读取数据或向其写入结果时,可以使用 DBInputFormat 和 DBOutputFormat:
// 从数据库读取
DBConfiguration.configureDB(job.getConfiguration(),
"com.mysql.jdbc.Driver", "jdbc:mysql://host/db",
"user", "password");
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, MyRecord.class,
"SELECT id, name, value FROM my_table", "SELECT COUNT(*) FROM my_table");
// 写入数据库
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "output_table", "col1", "col2", "col3");
这使得 MapReduce 可以与现有的关系数据库系统无缝集成,适合数据迁移和 ETL 场景。
DataJoin 框架
Hadoop 提供了 DataJoin 框架来简化多数据源的 Reduce 端 Join 操作。该框架包含三个抽象类:
- DataJoinMapperBase:程序员继承此类并实现三个方法——
generateInputTag()(产生数据源标签)、generateGroupKey()(指定 Join Key)、generateTaggedMapOutput()(为记录贴标签)。基类的map()方法自动完成标签化记录的生成和输出 - DataJoinReducerBase:基类的
reduce()方法自动完成多数据源记录的笛卡尔积组合,程序员只需实现combine()方法进行具体的连接处理 - TaggedMapOutput:描述标签化数据记录,程序员需继承并实现
getData()方法
// Mapper 实现
public class JoinMapper extends DataJoinMapperBase {
protected Text generateInputTag(String inputFile) {
return new Text(inputFile.split("-")[0]); // 用文件名作标签
}
protected Text generateGroupKey(TaggedMapOutput record) {
String line = ((Text) record.getData()).toString();
return new Text(line.split(",")[0]); // CustomerID 作 GroupKey
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable tw = new TaggedWritable((Text) value);
tw.setTag(this.inputTag);
return tw;
}
}
// Reducer 实现
public class JoinReducer extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) return null; // 不足两个数据源,无需连接
String joined = "";
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String[] tokens = ((Text) tw.getData()).toString().split(",", 2);
if (i > 0) joined += ",";
joined += (i == 0) ? tokens[0] + tokens[1] : tokens[1];
}
return new TaggedWritable(new Text(joined));
}
}
全局参数传递
当 MapReduce 程序需要传递全局参数(如阈值、配置项)时,可以通过 Configuration 对象实现:
// Driver 中设置参数
conf.set("threshold", "0.5");
conf.setInt("max.iterations", 10);
// Mapper 或 Reducer 中读取参数
@Override
protected void setup(Context context) {
double threshold = Double.parseDouble(
context.getConfiguration().get("threshold"));
int maxIter = context.getConfiguration().getInt("max.iterations", 10);
}
对于较大的数据文件,可以将其分发到各节点的本地磁盘,再通过 DistributedCache 机制在 Mapper/Reducer 中读取。