PageRank 与图计算
图问题与 MapReduce
图是一种强大的数据结构,广泛用于表示网页链接、社交网络、交通网络等关系数据。常见的图算法包括最短路径、最小生成树、广度优先搜索和 PageRank 等。在 MapReduce 中处理图数据有两个核心挑战:如何表示图结构,以及如何遍历图。
图的表示方法
邻接表:每个节点存储其邻居列表。表示紧凑,容易获取出度信息,但不方便获取入度。
邻接矩阵:N×N 矩阵,M[i,j]=1 表示节点 i 到 j 有边。便于数学运算,但对稀疏图浪费空间。
对于大规模图数据(如数十亿网页的链接图),邻接矩阵的空间开销 O(N²) 往往不可接受,因此 MapReduce 中通常采用邻接表表示。
PageRank 算法
基本思想
PageRank 是 Google 搜索引擎的核心排序算法,其基本思想是:一个网页的重要性取决于链接到它的网页的质量和数量。被许多优质网页链接的网页,自身也更可能是优质网页。
简化模型
对于网页 i,其 PageRank 值为所有链入网页的贡献之和:
其中 B_i 是所有链接到 i 的网页集合,L_j 是网页 j 的出度。每个网页将自己的 PageRank 值平均分配给它链接到的所有网页。
用矩阵表示为 R = H × R,其中 H 是转移矩阵(列归一化),R 是 PageRank 值的列向量。通过幂迭代法反复乘以 H 直到 R 收敛。
简化模型的问题
实际的网络链接图中存在两种特殊情况:
Rank Leak(排名泄漏):如果一个网页没有出度链接,PageRank 值会在迭代中不断泄漏,最终所有网页的 PR 值趋向于 0。
Rank Sink(排名下沉):如果一组网页形成闭环但没有入度链接,它们会不断"吸收"其他网页贡献的 PR 值,而外部网页的 PR 值趋向于 0。
随机浏览模型
为了解决上述问题,引入阻尼因子 d(通常 d = 0.85)。假设用户以概率 d 点击当前页面的链接继续浏览,以概率 (1-d) 随机跳转到任意网页:
其中 N 是网页总数。这个模型保证了:
- 每次迭代都有 (1-d)/N 的"注入值",防止 PR 值泄漏
- 马尔可夫链收敛,存在唯一解
- 更符合用户的实际浏览行为
MapReduce 实现
PageRank 的 MapReduce 实现分为三个阶段:
Phase 1: GraphBuilder
解析原始网页链接数据,构建邻接表。输入为原始网页文本,输出为 (URL, (初始PR值, 链出列表))。
public class GraphBuilderMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 输入格式: "URL link1 link2 link3 ..."
String[] parts = value.toString().split("\\s+");
String url = parts[0];
StringBuilder links = new StringBuilder();
for (int i = 1; i < parts.length; i++) {
if (i > 1) links.append(",");
links.append(parts[i]);
}
// 输出: <URL, "PR_init|link1,link2,link3">
double initPR = 0.5; // 初始 PR 值(课程示例使用 0.5)
context.write(new Text(url),
new Text(initPR + "|" + links.toString()));
}
}
该阶段的 Reduce 不需要做任何处理,直接输出即可。
Phase 2: PageRankIter
迭代计算 PageRank 值。这是核心阶段,每个迭代包含一个完整的 MapReduce Job。
Mapper:对每个网页,将其当前 PR 值平均分配给所有链出网页,同时保留图结构信息。
public class PageRankMapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
// key: URL, value: "currentPR|link1,link2,..."
String[] parts = value.toString().split("\\|");
double currentPR = Double.parseDouble(parts[0]);
String[] links = parts.length > 1 ? parts[1].split(",") : new String[0];
int outDegree = links.length;
// 1. 向每个链出网页发送贡献值
double contribution = currentPR / outDegree;
for (String link : links) {
context.write(new Text(link), new Text("C|" + contribution));
}
// 2. 保留图结构(必须传递,否则迭代中图信息会丢失)
context.write(key, new Text("G|" + parts[1]));
}
}
Reducer:汇总所有贡献值,计算新的 PR 值。
public class PageRankReducer extends Reducer<Text, Text, Text, Text> {
private static final double D = 0.85;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
double sumContributions = 0;
String links = "";
for (Text val : values) {
String[] parts = val.toString().split("\\|", 2);
if (parts[0].equals("C")) {
sumContributions += Double.parseDouble(parts[1]);
} else if (parts[0].equals("G")) {
links = parts[1]; // 保留图结构
}
}
// PageRank 公式: (1-d)/N + d * sum(contributions)
int N = context.getConfiguration().getInt("num.pages", 1);
double newPR = (1 - D) / N + D * sumContributions;
context.write(key, new Text(newPR + "|" + links));
}
}
Phase 3: RankViewer
将最终结果按 PR 值从大到小排序输出。
// 自定义 FloatWritable,使排序从大到小
public class DescFloatWritable extends FloatWritable {
@Override
public int compareTo(FloatWritable o) {
return -super.compareTo(o); // 反序
}
}
public class RankViewerMapper extends Mapper<LongWritable, Text, DescFloatWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split("\\|");
String url = parts[0];
float pr = Float.parseFloat(parts[1]);
context.write(new DescFloatWritable(pr), new Text(url));
}
}
多趟 MapReduce 驱动
public class PageRankDriver {
private static final int ITERATIONS = 10;
public static void main(String[] args) throws Exception {
// Phase 1: 构建图
String[] gbArgs = {args[0], args[1] + "/iter0"};
GraphBuilder.main(gbArgs);
// Phase 2: 迭代计算
for (int i = 0; i < ITERATIONS; i++) {
String[] iterArgs = {
args[1] + "/iter" + i,
args[1] + "/iter" + (i + 1)
};
PageRankIter.main(iterArgs);
}
// Phase 3: 排序输出
String[] rvArgs = {
args[1] + "/iter" + ITERATIONS,
args[1] + "/final"
};
RankViewer.main(rvArgs);
}
}
迭代终止条件
PageRank 迭代可以选择以下任一条件终止:
- 各网页 PR 值不再变化(变化量小于阈值)
- PR 值排序不再变化
- 达到固定迭代次数(实践中最常用,通常 10-20 次)
实际应用
PageRank 算法的思想不仅适用于网页排序,还可以推广到其他领域:
- 社交网络影响力分析:将用户视为网页,关注关系视为链接
- 论文引用分析:高被引论文具有更高的"学术 PageRank"
- 推荐系统:用户-物品二部图上的 PageRank 可用于推荐
- 知识图谱推理:在实体关系图上计算重要性得分
PageRank 展示了 MapReduce 处理图算法的强大能力:将复杂的迭代计算分解为多个简单的 MapReduce Job,每个 Job 处理一轮迭代,通过多次执行完成整个计算过程。