`
cocoIT
  • 浏览: 48077 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

MapReduce框架中PageRank算法的代码实现

 
阅读更多

主要包括5个类

PageRankNode:图中的节点类-代表一个页面

PageRankJob:实现分散各个节点的PR值的类

DistributionPRMass:实现dangling节点的PR值分散到其它节点的Job类

RangePartitioner:partition类 将连续的节点分配到同一个reduce中

PageRankDirver:整个工作的驱动类(主函数)


package com.zxx.PageRank;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class PageRankJob
{
	public static final double d = 0.85;
	private static final double nodecount = 10;
	private static final double threshold=0.01;//收敛邻接点
	
	public static enum MidNodes
	{
		 // 记录已经收敛的个数
			Map, Reduce
	};

	public static class PageRankMaper extends Mapper<Object, Text, Text, Text>
	{
		@Override
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			PageRankNode node = PageRankNode.InstanceFormString(value.toString());
			node.setOldPR(node.getNewPR());
			context.write(new Text(node.getId()), new Text(PageRankNode.toStringWithOutID(node)));

			for (String str : node.getDestNodes())
			{
				String outPR = new Double(node.getNewPR() / (double)node.getNumDest()).toString();
				context.write(new Text(str), new Text(outPR));
			}
		}
	}

	public static class PageRankJobReducer extends Reducer<Text, Text, Text, Text>
	{
		private double totalMass = Double.NEGATIVE_INFINITY; // 缓存每个key从其它点得到的全部PR值
		private double missMass=Double.NEGATIVE_INFINITY;

		@Override
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			PageRankNode currentNode = new PageRankNode(key.toString());
			double inPR = 0.0;

			for (Text val : values)
			{
				String[] temp = val.toString().trim().split("\\s+");
				if (temp.length == 1) // 此时候只输出一个PR值
				{
					inPR += Double.valueOf(temp[0]);
				} else if (temp.length >= 4)
				{// 此时输出的是含有邻接点的节点全信息
					currentNode = PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());
				} else if (temp.length == 3)
				{ // 此时输出的点没有出度
					context.getCounter("PageRankJobReducer", "errornode").increment(1);
					currentNode=PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());
				}
			}
            if (currentNode.getNumDest()>=1)
			{
            	double newPRofD = (1 - PageRankJob.d) /(double) PageRankJob.nodecount + PageRankJob.d * inPR;
    			currentNode.setNewPR(newPRofD);
    			context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));
			}else if (currentNode.getNumDest()==0) {
				
				missMass=currentNode.getOldPR();//得到dangling节点的上一次的PR值,传播到下一个分布Pr的job
			}
			
			totalMass += inPR;
			double partPR=(currentNode.getNewPR()-currentNode.getOldPR())*(currentNode.getNewPR()-currentNode.getOldPR());
			if (partPR<=threshold)
			{
				context.getCounter(MidNodes.Reduce).increment(1);
			}
		}

		@Override
		public void cleanup(Context context) throws IOException, InterruptedException
		{
			// 将total记录到文件中
			Configuration conf = context.getConfiguration();
			String taskId = conf.get("mapred.task.id");
			String path = conf.get("PageRankMassPath");// 注意此处的path路径设置------------------
            
			if (missMass==Double.NEGATIVE_INFINITY)
			{
				return;
			}
			FileSystem fs = FileSystem.get(context.getConfiguration());
			FSDataOutputStream out = fs.create(new Path(path + "/"+"missMass"), false);
			out.writeDouble(missMass);
			out.close();
		}
	}
}
package com.zxx.PageRank;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

import com.zxx.Graph.ArrayListOfInts;
import com.zxx.Graph.BFSNode;
import com.zxx.Graph.HMapII;
import com.zxx.Graph.MapII;
import com.zxx.Graph.ReachableNodes;

public class DistributionPRMass
{
	public class GraphMapper extends Mapper<Object, Text, Text, Text>
	{
		private double missingMass = 0.0;
	    private int nodeCnt = 0;
		@Override
		public void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration();

		    missingMass = (double)conf.getFloat("MissingMass", 0.0f);//该值等于1-totalMass
		    nodeCnt = conf.getInt("NodeCount", 0);
		}
		@Override
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			PageRankNode currentNode=PageRankNode.InstanceFormString(value.toString().trim());
			currentNode.setOldPR(currentNode.getNewPR());
			
			double p=currentNode.getNewPR();
			double pnew=(1-PageRankJob.d)/(double)(nodeCnt-1)+PageRankJob.d*missingMass/(double)(nodeCnt-1);
			//double pnew=missingMass/(double)(nodeCnt-1);
			currentNode.setNewPR(p+pnew);
			context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));
		}

		@Override
		public void cleanup(Context context) throws IOException, InterruptedException
		{
			
		}
	}
}
package com.zxx.PageRank;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Partitioner;

public class RangePartitioner extends Partitioner<Text, Text> implements Configurable
{

	private int nodeCnt = 0;
	private Configuration conf;

	public RangePartitioner() {}
	@Override
	public Configuration getConf()
	{
		return conf;
	}

	@Override
	public void setConf(Configuration arg0)
	{
		this.conf = arg0;
	    configure();
	}

	@Override
	public int getPartition(Text arg0, Text arg1, int arg2)
	{
		return (int) ((float)(Integer.parseInt(arg0.toString()) / (float) nodeCnt) * arg2) % arg2;
	}
	private void configure()   //获得节点的总数
	{
		nodeCnt = conf.getInt("NodeCount", 0);
	}

}


package com.zxx.PageRank;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class PageRankDirver
{
    public static final int numNodes=5;  //节点数
    public static final int maxiter=10;   //最大收敛次数
	public static void main(String[] args) throws Exception
	{
		long count=0;  //缓存已经接近收敛的节点个数
		int it=1;
		int num=1;
		String input="/Graph/input/";
		String output="/Graph/output1";
		do{
			Job job=getPageRankJob(input, output);
			job.waitForCompletion(true);
			
			Counters counter = job.getCounters();
			count = counter.findCounter(PageRankJob.MidNodes.Reduce).getValue();
			
			
			input="/Graph/output"+it;
			it++;
			output="/Graph/output"+it;
			
		    Job job1=getDistrbuteJob(input,output);
		    job1.waitForCompletion(true);
		    
		    input="/Graph/output"+it;
			it++;
			output="/Graph/output"+it;
			
			if(num<maxiter)
			System.out.println("it:"+it+" "+count);
			num++;
		}while(count!=numNodes);

	}
	
	public static Job getPageRankJob(String inPath,String outPath) throws Exception
	{
		Configuration conf = new Configuration();
		Job job=new Job(conf,"PageRank job");
		
		job.getConfiguration().setInt("NodeCount", numNodes);
	    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
	    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
	    
	    job.getConfiguration().set("PageRankMassPath", "/mass");
	    

		job.setJarByClass(PageRankDirver.class);
		
		job.setNumReduceTasks(5);

		job.setMapperClass(PageRankJob.PageRankMaper.class);
		job.setReducerClass(PageRankJob.PageRankJobReducer.class);
		job.setPartitionerClass(RangePartitioner.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		FileInputFormat.addInputPath(job, new Path(inPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
		
		return job;	
	}

	public static Job getDistrbuteJob(String inPath,String outPath) throws Exception
	{
		Configuration conf = new Configuration();
		Job job=new Job(conf,"Ditribute job");
		
		double mass = Double.NEGATIVE_INFINITY;                //一下是读取dangling节点的PR值,将其分配到其他节点
	    FileSystem fs = FileSystem.get(conf);
	    for (FileStatus f : fs.listStatus(new Path("/mass/missMass")))
	    {
	      FSDataInputStream fin = fs.open(f.getPath());
	      mass = fin.readDouble();
	      fin.close();
	    }
	    job.getConfiguration().setFloat("MissingMass",(float)mass);
		job.getConfiguration().setInt("NodeCount", numNodes);
		job.getConfiguration().setInt("NodeCount", numNodes);
	    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
	    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
	    
	    job.getConfiguration().set("PageRankMassPath", "/mass");
	    

		job.setJarByClass(PageRankDirver.class);
		
		job.setNumReduceTasks(5);

		job.setMapperClass(PageRankJob.PageRankMaper.class);
		job.setReducerClass(PageRankJob.PageRankJobReducer.class);
		job.setPartitionerClass(RangePartitioner.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		FileInputFormat.addInputPath(job, new Path(inPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
		
		return job;	
	}
}



分享到:
评论

相关推荐

    PageRank算法实时大数据实验报告广工(Map Reduce)(附源码)

    实验内容 1. 采用基于“抽税”法在MapReduce框架下,分析图1的网页PageRank排名; 2. 图1中,若节点②和节点⑤是主题节点,采用面向主题的PageRank算法重新计算所有节点的PageRank值。

    MiniGoogle:由搜寻器,索引器,PageRank,MapReduce框架,搜索算法和前端组成

    迷你谷歌由搜寻器,索引器,PageRank,MapReduce框架,搜索算法和前端组成。 搜寻器,索引器和pagerank作为mapreduce作业运行。 该系统利用了一个主节点和八个工作/数据节点,这些节点通过REST调用进行通信。

    BigDataAnalysis_Exp3:实时大数据分析_PageRank算法

    2.图1中,若节点②和节点⑤是主题节点,采用面向主题的PageRank算法重新计算所有节点的PageRank值。二、实验设计(原理分析及流程)三、实验代码及数据记录1.代码1.0 文件结构图1.1 ENode.javapackage ...

    论文研究-一种Spark环境下的高效率大规模图数据处理机制.pdf

    最后通过PageRank和SSSP算法设计实验,与MapReduce框架和采用HDFS作持久层的Spark框架进行性能对比。实验证明提出的框架要比MapReduce框架快90倍,比采用HDFS作持久层的Spark框架快2倍,能够满足高效率图数据处理的...

    MapReduce之起源篇

    MapReduce的初衷是为了解决其搜索引擎中大规模的网页数据的并行化处理;到目前为止,Google公司内有上万个各种不同的算法问题和程序使用MapReduce进行处理 MapReduce 是一个使用简单的软件框架,基于它写出来的应用...

    使用MapReduce在大数据社交媒体网络中进行社区结构挖掘。

    社交媒体网络在人们的日常生活中发挥着越来越重要的作用。... 在现实世界中的社交媒体网络和人工网络上的经验结果表明,新框架在准确性,速度和可伸缩性方面优于我们以前的工作和一些著名的算法,例如Radetal,FastGN。

    maestro:在 FPGA 上使用异步累积更新加速迭代算法的框架

    在迭代算法中,通过对输入数据集(例如 PageRank,Dijkstra 的最短路径)执行重复计算得出最终结果。 并行化此类算法的现有技术使用诸如 MapReduce 和 Hadoop 之类的软件框架在集群中的多个基于 CPU 的工作站之间...

    PDM :基于Hadoop的并行数据分析系统 (2012年)

    提出了一款基于Hadoop的并行数据分析系统―――PDM.该系统拥有大量以MapReduce...介绍了基于电信数据的典型应用,如采用并行k均值和决策树算法实现的“套餐推荐”,利用并行PageRank算法实现的“营销关键点发现”等;最后

    Starred_Paper_Hadoop_Spark.docx

    本篇英文论文通过三个具体实例(WordCount Sorted By Key, WordCount Sorted by Values 和 PageRank算法)来对比Hadoop 和 Spark 在大数据应用中运行时间,从而观察这些研究实例随着的迭代计算次数的增加,其时间...

    一种Spark环境下的高效率大规模图数据处理机制 (2016年)

    针对现有的图处理和图管理框架存在的效率...最后通过PageRank和SSSP算法设计实验,与MapReduce框架和采用HDFS作持久层的Spark框架进行性能对比。实验证明提出的框架要比MapReduce框架快90倍,比采用HDFS作持久层的Spa

    Hadoop实战(第2版)

    数据科学.7 数据结构和算法的运用7.1 使用图进行数据建模和解决问题7.1.1 模拟图7.1.2 最短路径算法技术点52 找出两个用户间的最短距离7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 ...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    6.4.5 在MapReduce 中优化用户的Java 代码 6.4.6 数据序列化 6.5 本章小结 第4 部分 数据科学. 7 数据结构和算法的运用 7.1 使用图进行数据建模和解决问题 7.1.1 模拟图 7.1.2 最短路径算法 技术...

    这就是搜索引擎

    • PageRank 和田rs 算法是什么关系?有何异同? SALSA 算法是什么? Hilltop 算法又 是什么?各种链接分析算法之间是什么关系? • 如何识别搜索用户的真实搜索意图?用户搜索目的可以分为几类?什么是点击图? ...

Global site tag (gtag.js) - Google Analytics