Skip to content

高级 MapReduce 编程

基础的 MapReduce 编程模型虽然简洁,但面对复杂的实际需求时,往往需要更高级的技术手段。本章介绍 Combiner 优化、自定义数据类型、复合键排序、Join 操作、链式处理等高级编程技术。

Combiner 局部聚合

原理

Combiner 在 Map 端执行一次局部 Reduce 操作,减少 Shuffle 阶段需要传输的数据量。对于 WordCount,如果一个 Map 任务产生了 1000 个 (hello, 1),Combiner 可以在本地将其合并为 (hello, 1000),将网络传输量减少 1000 倍。

job.setCombinerClass(WordCountReducer.class);

使用限制

Combiner 只能在 Reduce 操作满足交换律结合律时安全使用。以下场景适用:

  • 求和(SUM)、计数(COUNT)
  • 求最大值(MAX)、最小值(MIN)

以下场景不适用:

  • 求平均值(AVERAGE)——(2+4)/2 ≠ (2/2 + 4/2) 的中间结果无法正确合并
  • 中位数(MEDIAN)

自定义数据类型

Writable 接口

Hadoop 的序列化机制基于 Writable 接口。当 Map 或 Reduce 需要输出自定义类型时,必须实现该接口:

public class TemperaturePair implements WritableComparable<TemperaturePair> {
    private int year;
    private double temperature;

    public TemperaturePair() {}

    public TemperaturePair(int year, double temperature) {
        this.year = year;
        this.temperature = temperature;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeDouble(temperature);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        year = in.readInt();
        temperature = in.readDouble();
    }

    @Override
    public int compareTo(TemperaturePair other) {
        int cmp = Integer.compare(this.year, other.year);
        if (cmp != 0) return cmp;
        return Double.compare(this.temperature, other.temperature);
    }

    // hashCode() 和 equals() 也需要实现
}

复合键排序(Secondary Sort)

问题场景

假设我们有气象数据 (年份, 温度),需要找出每年的最高温度。MapReduce 框架默认只按 key 排序,value 列表的顺序是不确定的。如果我们希望同一个年份的温度按降序排列,就需要用到 Secondary Sort。

实现策略

  1. 复合键:将 (年份, 温度) 组合为一个复合 key
  2. 自定义分区器:保证同年份的数据路由到同一 Reducer
  3. 自定义排序:先按年份升序,同年份内按温度降序
  4. 自定义分组器:保证同年份的所有记录进入同一次 reduce 调用
// 自定义分组比较器:只按年份分组
public class YearGroupComparator extends WritableComparator {
    protected YearGroupComparator() {
        super(TemperaturePair.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TemperaturePair pa = (TemperaturePair) a;
        TemperaturePair pb = (TemperaturePair) b;
        return Integer.compare(pa.getYear(), pb.getYear());
    }
}

// Job 配置
job.setPartitionerClass(YearPartitioner.class);      // 按年份分区
job.setSortComparatorClass(YearTempComparator.class); // 复合排序
job.setGroupingComparatorClass(YearGroupComparator.class); // 按年份分组

Join 操作

Reduce Side Join

最通用的 Join 方式,适用于任意大小的数据集。核心思想是在 Map 阶段为每条记录打上来源标签,Reduce 阶段根据标签将不同数据源的记录配对。

// Map 阶段:为每条记录添加来源标签
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();
    private String tag;  // "customer" 或 "order"

    @Override
    protected void setup(Context context) {
        tag = context.getConfiguration().get("data.source");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] parts = value.toString().split(",");
        outKey.set(parts[0]);  // join key (如 customer_id)
        outValue.set(tag + "," + value.toString());
        context.write(outKey, outValue);
    }
}

// Reduce 阶段:根据标签配对
public class JoinReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        List<String> customers = new ArrayList<>();
        List<String> orders = new ArrayList<>();

        for (Text val : values) {
            String[] parts = val.toString().split(",", 2);
            if (parts[0].equals("customer")) {
                customers.add(parts[1]);
            } else {
                orders.add(parts[1]);
            }
        }

        // 笛卡尔积连接
        for (String c : customers) {
            for (String o : orders) {
                context.write(key, new Text(c + "," + o));
            }
        }
    }
}

Reduce Side Join 的缺点是所有数据都需要经过 Shuffle,网络开销大。

Map Side Join(Distributed Cache)

当一个数据集较小(能放入内存)时,可以将其分发到所有 Map 节点,在 Map 端直接完成 Join,避免 Shuffle。

// 在 Driver 中将小表加入分布式缓存
job.addCacheFile(new URI("/data/small_table.txt"));

// 在 Mapper 的 setup 阶段读取小表到内存
@Override
protected void setup(Context context) throws IOException {
    Map<String, String> smallTable = new HashMap<>();
    URI[] cacheFiles = context.getCacheFiles();
    BufferedReader reader = new BufferedReader(
        new FileReader(new File(cacheFiles[0].getPath())));
    String line;
    while ((line = reader.readLine()) != null) {
        String[] parts = line.split(",");
        smallTable.put(parts[0], line);
    }
    reader.close();
}

// 在 map 方法中直接查找并连接
@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String[] parts = value.toString().split(",");
    String joinKey = parts[0];
    String smallRecord = smallTable.get(joinKey);
    if (smallRecord != null) {
        context.write(new Text(joinKey),
            new Text(value.toString() + "," + smallRecord));
    }
}

Map Side Join 没有 Shuffle 开销,性能远优于 Reduce Side Join。

链式 MapReduce

ChainMapper / ChainReducer

当需要对数据进行多步处理时,可以使用 ChainMapperChainReducer 将多个 Map 和 Reduce 串联在一个 Job 中:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 第一个 Mapper
ChainMapper.addMapper(job, CleanMapper.class,
    LongWritable.class, Text.class, Text.class, Text.class, conf);

// Reducer
ChainReducer.setReducer(job, AggregateReducer.class,
    Text.class, Text.class, Text.class, Text.class, conf);

// Reducer 之后的 Mapper(后处理)
ChainReducer.addMapper(job, FormatMapper.class,
    Text.class, Text.class, Text.class, Text.class, conf);

链式处理减少了 Job 之间的磁盘 I/O,但增加了单个 Job 的复杂度。

DAG 执行控制

JobControl

当多个 MapReduce Job 之间存在依赖关系(Job B 需要 Job A 的输出作为输入)时,可以使用 JobControl 管理 DAG(有向无环图)执行:

JobControl control = new JobControl("pipeline");

ControlledJob jobA = new ControlledJob(conf1);
ControlledJob jobB = new ControlledJob(conf2);
ControlledJob jobC = new ControlledJob(conf3);

// jobB 依赖 jobA,jobC 依赖 jobB
jobB.addDependingJob(jobA);
jobC.addDependingJob(jobB);

control.addJob(jobA);
control.addJob(jobB);
control.addJob(jobC);

// 在新线程中运行
Thread runner = new Thread(control);
runner.start();

while (!control.allFinished()) {
    Thread.sleep(5000);
}
control.stop();

MultipleOutputFormat

默认情况下,Reducer 的所有输出写入同一个文件。使用 MultipleOutputFormat 可以将不同类型的输出写入不同文件:

MultipleOutputs.addNamedOutput(job, "good",
    TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "bad",
    TextOutputFormat.class, Text.class, Text.class);

// 在 Reducer 中
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
    if (isGoodData(key)) {
        mos.write("good", key, value);
    } else {
        mos.write("bad", key, value);
    }
}

DBInputFormat / DBOutputFormat

当需要从关系数据库读取数据或向其写入结果时,可以使用 DBInputFormatDBOutputFormat

// 从数据库读取
DBConfiguration.configureDB(job.getConfiguration(),
    "com.mysql.jdbc.Driver", "jdbc:mysql://host/db",
    "user", "password");
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, MyRecord.class,
    "SELECT id, name, value FROM my_table", "SELECT COUNT(*) FROM my_table");

// 写入数据库
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "output_table", "col1", "col2", "col3");

这使得 MapReduce 可以与现有的关系数据库系统无缝集成,适合数据迁移和 ETL 场景。

DataJoin 框架

Hadoop 提供了 DataJoin 框架来简化多数据源的 Reduce 端 Join 操作。该框架包含三个抽象类:

  • DataJoinMapperBase:程序员继承此类并实现三个方法——generateInputTag()(产生数据源标签)、generateGroupKey()(指定 Join Key)、generateTaggedMapOutput()(为记录贴标签)。基类的 map() 方法自动完成标签化记录的生成和输出
  • DataJoinReducerBase:基类的 reduce() 方法自动完成多数据源记录的笛卡尔积组合,程序员只需实现 combine() 方法进行具体的连接处理
  • TaggedMapOutput:描述标签化数据记录,程序员需继承并实现 getData() 方法
// Mapper 实现
public class JoinMapper extends DataJoinMapperBase {
    protected Text generateInputTag(String inputFile) {
        return new Text(inputFile.split("-")[0]);  // 用文件名作标签
    }
    protected Text generateGroupKey(TaggedMapOutput record) {
        String line = ((Text) record.getData()).toString();
        return new Text(line.split(",")[0]);  // CustomerID 作 GroupKey
    }
    protected TaggedMapOutput generateTaggedMapOutput(Object value) {
        TaggedWritable tw = new TaggedWritable((Text) value);
        tw.setTag(this.inputTag);
        return tw;
    }
}

// Reducer 实现
public class JoinReducer extends DataJoinReducerBase {
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
        if (tags.length < 2) return null;  // 不足两个数据源,无需连接
        String joined = "";
        for (int i = 0; i < values.length; i++) {
            TaggedWritable tw = (TaggedWritable) values[i];
            String[] tokens = ((Text) tw.getData()).toString().split(",", 2);
            if (i > 0) joined += ",";
            joined += (i == 0) ? tokens[0] + tokens[1] : tokens[1];
        }
        return new TaggedWritable(new Text(joined));
    }
}

全局参数传递

当 MapReduce 程序需要传递全局参数(如阈值、配置项)时,可以通过 Configuration 对象实现:

// Driver 中设置参数
conf.set("threshold", "0.5");
conf.setInt("max.iterations", 10);

// Mapper 或 Reducer 中读取参数
@Override
protected void setup(Context context) {
    double threshold = Double.parseDouble(
        context.getConfiguration().get("threshold"));
    int maxIter = context.getConfiguration().getInt("max.iterations", 10);
}

对于较大的数据文件,可以将其分发到各节点的本地磁盘,再通过 DistributedCache 机制在 Mapper/Reducer 中读取。