Skip to content

PageRank 与图计算

图问题与 MapReduce

图是一种强大的数据结构,广泛用于表示网页链接、社交网络、交通网络等关系数据。常见的图算法包括最短路径、最小生成树、广度优先搜索和 PageRank 等。在 MapReduce 中处理图数据有两个核心挑战:如何表示图结构,以及如何遍历图。

图的表示方法

邻接表:每个节点存储其邻居列表。表示紧凑,容易获取出度信息,但不方便获取入度。

A → [B, C]
B → [C]
C → [A]

邻接矩阵:N×N 矩阵,M[i,j]=1 表示节点 i 到 j 有边。便于数学运算,但对稀疏图浪费空间。

对于大规模图数据(如数十亿网页的链接图),邻接矩阵的空间开销 O(N²) 往往不可接受,因此 MapReduce 中通常采用邻接表表示。

PageRank 算法

基本思想

PageRank 是 Google 搜索引擎的核心排序算法,其基本思想是:一个网页的重要性取决于链接到它的网页的质量和数量。被许多优质网页链接的网页,自身也更可能是优质网页。

简化模型

对于网页 i,其 PageRank 值为所有链入网页的贡献之和:

\[PR(i) = \sum_{j \in B_i} \frac{PR(j)}{L_j}\]

其中 B_i 是所有链接到 i 的网页集合,L_j 是网页 j 的出度。每个网页将自己的 PageRank 值平均分配给它链接到的所有网页。

用矩阵表示为 R = H × R,其中 H 是转移矩阵(列归一化),R 是 PageRank 值的列向量。通过幂迭代法反复乘以 H 直到 R 收敛。

简化模型的问题

实际的网络链接图中存在两种特殊情况:

Rank Leak(排名泄漏):如果一个网页没有出度链接,PageRank 值会在迭代中不断泄漏,最终所有网页的 PR 值趋向于 0。

Rank Sink(排名下沉):如果一组网页形成闭环但没有入度链接,它们会不断"吸收"其他网页贡献的 PR 值,而外部网页的 PR 值趋向于 0。

随机浏览模型

为了解决上述问题,引入阻尼因子 d(通常 d = 0.85)。假设用户以概率 d 点击当前页面的链接继续浏览,以概率 (1-d) 随机跳转到任意网页:

\[PR(i) = \frac{1-d}{N} + d \sum_{j \in B_i} \frac{PR(j)}{L_j}\]

其中 N 是网页总数。这个模型保证了:

  • 每次迭代都有 (1-d)/N 的"注入值",防止 PR 值泄漏
  • 马尔可夫链收敛,存在唯一解
  • 更符合用户的实际浏览行为

MapReduce 实现

PageRank 的 MapReduce 实现分为三个阶段:

Phase 1: GraphBuilder

解析原始网页链接数据,构建邻接表。输入为原始网页文本,输出为 (URL, (初始PR值, 链出列表))

public class GraphBuilderMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 输入格式: "URL  link1 link2 link3 ..."
        String[] parts = value.toString().split("\\s+");
        String url = parts[0];
        StringBuilder links = new StringBuilder();
        for (int i = 1; i < parts.length; i++) {
            if (i > 1) links.append(",");
            links.append(parts[i]);
        }
        // 输出: <URL, "PR_init|link1,link2,link3">
        double initPR = 0.5;  // 初始 PR 值(课程示例使用 0.5)
        context.write(new Text(url),
            new Text(initPR + "|" + links.toString()));
    }
}

该阶段的 Reduce 不需要做任何处理,直接输出即可。

Phase 2: PageRankIter

迭代计算 PageRank 值。这是核心阶段,每个迭代包含一个完整的 MapReduce Job。

Mapper:对每个网页,将其当前 PR 值平均分配给所有链出网页,同时保留图结构信息。

public class PageRankMapper extends Mapper<Text, Text, Text, Text> {
    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {
        // key: URL, value: "currentPR|link1,link2,..."
        String[] parts = value.toString().split("\\|");
        double currentPR = Double.parseDouble(parts[0]);
        String[] links = parts.length > 1 ? parts[1].split(",") : new String[0];
        int outDegree = links.length;

        // 1. 向每个链出网页发送贡献值
        double contribution = currentPR / outDegree;
        for (String link : links) {
            context.write(new Text(link), new Text("C|" + contribution));
        }

        // 2. 保留图结构(必须传递,否则迭代中图信息会丢失)
        context.write(key, new Text("G|" + parts[1]));
    }
}

Reducer:汇总所有贡献值,计算新的 PR 值。

public class PageRankReducer extends Reducer<Text, Text, Text, Text> {
    private static final double D = 0.85;

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        double sumContributions = 0;
        String links = "";

        for (Text val : values) {
            String[] parts = val.toString().split("\\|", 2);
            if (parts[0].equals("C")) {
                sumContributions += Double.parseDouble(parts[1]);
            } else if (parts[0].equals("G")) {
                links = parts[1];  // 保留图结构
            }
        }

        // PageRank 公式: (1-d)/N + d * sum(contributions)
        int N = context.getConfiguration().getInt("num.pages", 1);
        double newPR = (1 - D) / N + D * sumContributions;

        context.write(key, new Text(newPR + "|" + links));
    }
}

Phase 3: RankViewer

将最终结果按 PR 值从大到小排序输出。

// 自定义 FloatWritable,使排序从大到小
public class DescFloatWritable extends FloatWritable {
    @Override
    public int compareTo(FloatWritable o) {
        return -super.compareTo(o);  // 反序
    }
}

public class RankViewerMapper extends Mapper<LongWritable, Text, DescFloatWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] parts = value.toString().split("\\|");
        String url = parts[0];
        float pr = Float.parseFloat(parts[1]);
        context.write(new DescFloatWritable(pr), new Text(url));
    }
}

多趟 MapReduce 驱动

public class PageRankDriver {
    private static final int ITERATIONS = 10;

    public static void main(String[] args) throws Exception {
        // Phase 1: 构建图
        String[] gbArgs = {args[0], args[1] + "/iter0"};
        GraphBuilder.main(gbArgs);

        // Phase 2: 迭代计算
        for (int i = 0; i < ITERATIONS; i++) {
            String[] iterArgs = {
                args[1] + "/iter" + i,
                args[1] + "/iter" + (i + 1)
            };
            PageRankIter.main(iterArgs);
        }

        // Phase 3: 排序输出
        String[] rvArgs = {
            args[1] + "/iter" + ITERATIONS,
            args[1] + "/final"
        };
        RankViewer.main(rvArgs);
    }
}

迭代终止条件

PageRank 迭代可以选择以下任一条件终止:

  • 各网页 PR 值不再变化(变化量小于阈值)
  • PR 值排序不再变化
  • 达到固定迭代次数(实践中最常用,通常 10-20 次)

实际应用

PageRank 算法的思想不仅适用于网页排序,还可以推广到其他领域:

  • 社交网络影响力分析:将用户视为网页,关注关系视为链接
  • 论文引用分析:高被引论文具有更高的"学术 PageRank"
  • 推荐系统:用户-物品二部图上的 PageRank 可用于推荐
  • 知识图谱推理:在实体关系图上计算重要性得分

PageRank 展示了 MapReduce 处理图算法的强大能力:将复杂的迭代计算分解为多个简单的 MapReduce Job,每个 Job 处理一轮迭代,通过多次执行完成整个计算过程。