`
cocoIT
  • 浏览: 48056 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

Hadoop学习笔记

阅读更多

应用开发

主要知识点如下:

Configuration类(支持overwrite,variable$)

测试(mock单元测试,本地测试,集群测试)

Tool,ToolRunner

集群测试(package,启动job,JobwebUIfornamenodeandjobtracker)

运程调试器(keep.failed.task.files=true,使用ISolationRunner)

作业调优(HPROF)

MapReduce工作流(oozie)

1.在本地运行测试数据

publicclassMaxTemperatureDriverextendsConfiguredimplementsTool{

publicintrun(String[]args)throwsException{

Jobjob=newJob(getConf(),“computemaxtemperature”);

job.setJarByClass();

job.setMapperClass();

job.setReducerClass();

FileInputFormat.addInputPath(job,newPath(args[0]));

FileOutputFormat.addOutputPath(job,newPath(args[1]));

returnjob.waitForCompletion(true);

}

publicstaticvoidmain(String[]args){

intexitCode=ToolRunner.run(newMaxTemperatureDriver(),args);

System.exit(exitCode);

}

}

编译上面的代码,在根节点处运行hadoop命令(事先将hadoop进程在本地启动):

hadoopMaxTemperature–confconf/hadoop-local.xmlinput/ncdcmax-temp

2.集群上运行

使用jar命令将class文件打包,然后使用jar命令上传并启动任务(事先将hadoop在集群中启动):

%hadoopjarjob.jarMaxTempratureDriver–confconf/hadoop-cluster.xmlinputoutput

3.Hadoop守护进程的地址和端口

RPC

namenodeRPC地址和端口hdfs://localhost:8020(fs.default.name)

jobtrackerRPC地址和端口localhost:8021(mapred.job.tracker)

datanodeTCP/IP服务器(块传输)50010(dfs.datanode.address)

datanodeRPC地址和端口localhost:50020(dfs.datanode.ipc.address)

tasktrackerRPC地址和端口(mapred.task.tracker.report.address)

HTTP

jobtracker50030(mapred.job.tracker.http.address)

tasktracker50060(mapred.task.tracker.http.address)

namenode50070(dfs.http.address)

datanode50075(dfs.datanode.http.address)

secondary50090(dfs.secondary.http.address)

4.作业调试(计数器和状态)

在map/reduce程序中可以通过计数器和状态来记录数据中的一些状态,可以通过webUI或脚本指令来查看运行后的计数器或状态。

context.setStatus(“”);

context.incrCounter(Stringgroup,Stringcounter,intnum);

命令行查询计数器:

%hadoopjob–counterjob_201111160811_0003‘MaxTemperatureMaper$Temperature’ENUM

远程调试器

在集群上运行作业很难调试,但是可以配置Hadoop保留作业运行期间产生的所有中间值,以便稍后在调试器上重新运行这些出错的任务。

1)设置属性保留中间数据keep.failed.task.files=true

2)运行作业,在web界面上查看故障节点和task_attempt_ID;

3)通过上面的ID来查找保存的中间数据文件。mapred.local.dir定义了本地缓存目录,在指定的一个或多个目录下寻找对应的job_id下的task_temp_id目录,下面存放着job.xml,map输入的序列化文件,map输出备份(在output目录下),和work目录(task_attempt的工作目录)。

4)在脚本控制台cd到上面的work目录,设置运程调试器属性并启动hadoop进入debug模式:

%exportHADOOP_OPTS=”-agentlib:jdwp=transfport=dt_socket,server=y,suspend=y,address=8787”

%hadooporg.apache.hadoop.mapred.IsolationRunner../job.xml

5)在运程客户端启动JavaIDE如Eclipse远程连接上面主机的8787端口,在map/reduce源代码中设置断点等待。

上述调试技术不只适用于失败的任务,还可以保留成功完成的任务数据来调试内部逻辑。这是,可将属性keep.task.files.pattern设置为一个正则表达式(与保留的任务ID匹配)。

其它一些调试的技巧:

在linux下dumpJavathreadstacktrace

如果是在控制台中运行,则直接ctrl+\

如果是在后台运行,可以先找到运行java的pid,然后kill-QUITPID,会将threadstack内容输出到该java进程的标准输出流里,例如tomcat就会写在catalina.out里。

jstack[-l]pid

如果java程序崩溃生成core文件,jstack工具可以用来获得core文件的javastack和nativestack的信息,从而可以轻松地知道java程序是如何崩溃和在程序何处发生问题。另外,jstack工具还可以附属到正在运行的java程序中,看到当时运行的java程序的javastack和nativestack的信息,如果现在运行的java程序呈现hung的状态,jstack是非常有用的。

5作业调优

哪些因素影响作业的运行效率?

mapper的数量:尽量将输入数据切分成数据块的整数倍。如有太多小文件,则考虑CombineFileInputFormat;

reducer的数量:为了达到最高性能,集群中reducer数应该略小于reducer的任务槽数。

combiner:充分使用合并函数减少map和reduce之间传递的数据量,combiner在map后运行;

中间值的压缩:对map输出值进行压缩减少到reduce前的传递量(conf.setCompressMapOutput(true)和setMapOutputCompressorClass(GzipCodec.class));

自定义序列:如果使用自定义的Writable对象或自定义的comparator,则必须确保已实现RawComparator

调整shuffle:MapReduce的shuffle过程可以对一些内存管理的参数进行调整,以弥补性能不足;

另一个有用的方法是启用JDK的HPROF分析来获取程序的CPU和堆栈使用情况。

conf.setProfileEnabled(true);//“mapred.task.profile”

conf.setProfileParams(“-agentlib:hprof=cpu=samples,heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s”);//“mapred.task.profile.params”

conf.setProfileTaskRange(true,“0-2”);//第一个参数表示map,false则分析reduce;第二个参数任务ID范围

将上述程序加入驱动程序后重新运行,分析结果将输出到作业日志的末尾。

MapReduce工作机制

知识点小结:

shuffle影响性能的因素

1Map–>buffer–>partition,sort,spilltodisk(输出缓冲区,溢出写磁盘比例,运行combiner最小溢出写文件数3,tasktracker工作线程数)

2Reduce

copy(5threads)–>memory(buffersize)–>disk(threhold)–>merge–>reduce

1剖析MapReduce作业运行机制

1.1作业的提交

客户端通过JobClient.runJob()来提交一个作业到jobtracker,JobClient程序逻辑如下:

a)向Jobtracker请求一个新的jobid(JobTracker.getNewJobId());

b)检查作业的输出说明,如已存在抛错误给客户端;计算作业的输入分片;

c)将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以jobid命名的目录下。作业jar副本较多(mapred.submit.replication=10);

d)告知jobtracker作业准备执行(submitjob)。

1.2作业的初始化

jobtracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由jobscheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象(封装任务和记录信息)。

为了创建任务运行列表,jobscheduler首先从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务;创建的reduce任务数量由JobConf的mapred.reduce.task属性决定,schedule创建相应数量的reduce任务。任务此时被执行ID。

1.3任务的分配

jobtacker应该先选择哪个job来运行?这个由jobscheduler来决定,下面会详细讲到。

jobtracker如何选择tasktracker来运行选中作业的任务呢?

每个tasktracker定期发送心跳给jobtracker,告知自己还活着,是否可以接受新的任务。jobtracker以此来决定将任务分配给谁(仍然使用心跳的返回值与tasktracker通信)。每个tasktracker会有固定数量的任务槽来处理map和reduce(比如2,表示tasktracker可以同时运行两个map和reduce),由机器内核的数量和内存大小来决定。jobtracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。

jobtracker选择哪个tasktracker来运行map任务需要考虑网络位置,它会选择一个离输入分片较近的tasktracker,优先级是数据本地化(data-local)–>机架本地化(rack-local)。

对于reduce任务,没有什么标准来选择哪个tasktracker,因为无法考虑数据的本地化。map的输出始终是需要经过整理(切分排序合并)后通过网络传输到reduce的,可能多个map的输出会切分出一部分送给一个reduce,所以reduce任务没有必要选择和map相同或最近的机器上。

1.4任务的执行

1.tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的本地文件系统(jar本地化用来启动JVM)。同时将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。

2.接下来tasktracker为任务新建一个本地工作目录work,并把jar文件的内容解压到这个文件夹下。

3.tasktracker新建一个taskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每个任务,以便客户的map/reduce不会影响tasktracker守护进程。但在不同任务之间重用JVM还是可能的。子进程通过umbilical接口(?什么含义,暂时未知)与父进程进行通信。任务的子进程每隔几秒便告知父进程的进度,直到任务完成。

Streaming和Pipes是用来运行其它语言编写的map和reduce。Streaming任务特指任务使用标准输入输出steaming与进程通信,可以是任何语言编写的。pipes特指C++语言编写的任务,其通过socket来通信(persistentsocketconnection)。

1.5进度和状态的更新

一个作业和每个任务都有一个状态信息,包括:作业或任务的运行状态(running,successful,failed),map和reduce的进度,计数器值,状态消息或描述。

这些信息通过一定的时间间隔由childJVM–>tasktracker–>jobtracker汇聚。jobtracker将产生一个表明所有运行作业及其任务状态的全局试图。你可以通过WebUI查看。同时JobClient通过每秒查询jobtracker来获得最新状态。

1.6作业的完成

1.7作业的失败

2.作业的调度

默认调度器–基于队列的FIFO调度器

公平调度器(FairScheduler)-每个用户都有自己的作业池,用map和reduce的任务槽数来定制作业池的最小容量,也可以设置每个池的权重。FairScheduler支持抢占,如果一个池在特定的一段时间内未得到公平的资源共享,它会中止运行池得到过多资源的任务,以便把任务槽让给运行资源不足的池。启动步骤:

1)拷贝contrib/fairscheduler下的jar复制到lib下;

2)mapred.jobtracker.taskScheduler=org.apache.hadoop.mapred.FairScheduler

3)重启节点hadoop

能力调度器(CapacityScheduler)-

3.shuffle和排序

shuffle特指map输出后到reduce运行前得到输入的整个过程,它是MapReduce的心脏,属于不断被优化和改进的代码库的一部分,下面主要针对0.20版本。

Map端

1)Map输出首先放在内存缓冲区(io.sort.mb属性定义,默认100MB);

2)守护进程会将缓冲区的数据按照目标reducer划分成不同的分区(partition),同时按键进行内排序;如果客户端定义了combiner,则combiner会在排序后运行,继续压缩缓存区的数据;

3)缓冲区上定义了一个阈值(io.sort.spill.percent,默认为0.8),当存储内容达到这个值时,缓冲区的值会被写到本地文件中(mapred.local.dir定义,可以是一个或多个目录);这种文件会有多个,每个的内容都是按照reducer分区且局部排序的。这个过程简称spilltodisk;

4)Map输出完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件中,合并会分多次,每次合并的中间文件个数有io.sort.factor来定义,默认是10;这个过程也会伴随着combiner的运行,min.num.spills.for.combine定义了运行combiner之前溢出写的次数;

5)写磁盘时可以压缩文件。mapred.compress.map.output设置为true,mapred.map.output.compression.codec指定压缩实现类;

map任务完成后,会通知父tasktracker状态已更新,然后tasktracker通过心跳通知jobtracker。下面的reduce所在的tasktracker有一个线程定期询问jobtracker以便获得map输出的位置,直到它获得所有输出的位置。

Reduce端

1)每个map任务的完成时间可能不同,但只要有一个任务完成,reduce任务得知后就开始复制对应它的输出,复制线程数由mapred.reduce.parallel.copies定义,默认为5;

2)如果map输出相当小,则不用复制到文件中,而是reducetasktracker的内存中。缓冲区大小由mapred.job.shuffle.input.buffer.percent定义用于此用途的堆空间的百分比,默认0.7;一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent,默认值为0.66)或达到reduce输出阈值(mapred.inmem.merge.threshold,默认值为1000),则合并后溢出写到磁盘中;

3)随着磁盘上副本的增多,后台线程会将它们合并为更大的排好序的文件。为了合并,压缩的map输出必须在内存中被解压缩;

4)复制完所有的map输出后,reduce任务进入合并阶段(sortphase,合并多个文件,并按键排序)。io.sort.factor定义了每次合并数,默认为10,即每10个map输出合并一次。会有很多个合并后的中间文件。

5)最后直接把中间文件数据输入给reduce函数,对已排序输出中的每个键都要调用reduce函数,此阶段的输出直接写到HDFS中。

配置的调优

总原则:给shuffle过程尽量多提供内存空间,但也要确保map函数和reduce函数能得到足够的内存。

运行map和reduce任务的JVM内存大小有mapred.child.java.opts属性设置。

在map端,避免多次溢出写磁盘来获得最佳性能。计数器spilled.records计算在作业运行整个阶段中溢出写磁盘的记录数,大则表明写磁盘太频繁;

在reduce端,中间数据全部驻留在内存中就能得到最佳性能。如果reduce函数的内存需求不大,那么把mapred.inmem.merg.threshold设置为0,把mapred.job.reduce.input.buffer.percent设置为1会带来性能的提升。

4.任务的执行

Hadoop发现一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份,即“推测执行”(speculativeexecution)。

推测执行是一种优化措施,并不能使作业运行更可靠。默认启用,但可以单独为map/reduce任务设置,mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution。开启此功能会减少整个吞吐量,在集群中倾向于关闭此选项,而让用户根据个别作业需要开启该功能。

Hadoop为每个任务启动一个新JVM需要耗时1秒,对于大量超短任务如果重用JVM会提升性能。当启用JVM重用后,JVM不会同时运行多个任务,而是顺序执行。tasktracker可以一次启动多个JVM然后同时运行,接着重用这些JVM。控制任务重用JVM的属性是mapred.job.reuse.jvm.num.tasks,它指定给定作业每个JVM运行的任务的最大数,默认为1,即无重用;-1表示无限制即该作业的所有的任务都是有一个JVM。

在map/reduce程序中,可以通过某些环境属性(Configuration)得知作业和任务的信息。

mapred.job.id作业ID,如job_201104121233_0001

mapred.tip.id任务ID,如task_201104121233_0001_m_000003

mapred.task.id任务尝试ID,如attempt_201104121233_0001_m_000003_0

mapred.task.partition作业中任务的ID,如3

mapred.task.is.map此任务是否为map任务,如true

MapReduce类型和格式

1.MapReduce的类型

map(K1,V1)–>list(K2,V2)//对输入数据进行抽取过滤排序等操作

combine(K2,list(V2))–>list(K2,V2)//为了减少reduce的输入,需要在map端对输出进行预处理,类似reduce。不是所有的reduce都在部分数据集上有效,比如求平均值就不能简单用于combine

partition(K2,V2)–>integer//将中间键值对划分到一个reduce分区,返回分区索引号。分区内的键会排序,相同的键的所有值会合成一个组(list(V2))

reduce(K2,list(V2))–>list(K3,V3)//每个reduce会处理具有某些特性的键,每个键上都有值的序列,是通过对所有map输出的值进行统计得来的;当获得一个分区后,tasktracker会对每条记录调用reduce。

默认的map和reduce函数是IdentityMapper和IdentityReducer,均是泛型类型,简单的将所有输入写到输出中。默认的partitioner是HashPartitioner,对每天记录的键进行哈希操作以决定该记录属于那个分区让reduce处理。

输入数据的类型有输入格式(InputFormat类)进行设置,其它的类型通过JobConf上的方法显示设置。这里显式设置中间和最终输出类型的原因是因为Java语言的泛型实现是typeerasure。另外如果K2和K3是相同类型,就不需要调用setMapOutputKeyClass(),因为它将调用setOutputKeyClass()来设置。

2.输入格式

2.1输入分片与记录

一个输入分片(split)是由单个map处理的输入块(分片个数即map所需的tasktracker个数),每个分片包含若干记录(key+value),map函数依次处理每条记录。输入分片表示为InputSplit接口,其包含一个以字节为单位的长度和一组存储位置,分片不包含数据本身,而是指向数据的引用。

InputSplit是由InputFormat创建的,一般无需应用开发人员处理。InputFormat负责产生输入分片并将它们分割成记录。

1)JobClient调用InputFormat.getSplites()方法,传入预期的map任务数(只是一个参考值);

2)InputFormat计算好分片数后,客户端将它们发送到jobtracker,jobtracker便使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。

3)在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的RecordReader;RecordReader基本上就是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后在传给map函数。

2.2FileInputFormat

输入路径可由多个函数FileInputFormat.addInputPath()指定,还可以利用FileInputFormat.setInputPathFilter()设置过滤器。输入分片的大小有上个属性控制:分片最小字节数,分片最大字节数和HDFS数据块字节数。

mapred.min.split.size,mapred.max.split.size,dfs.block.size

计算公式是:

max(minSplitSize,min(maxSplitSize,blockSize))

没有特殊需求,应该尽量让分片大小和数据块大小一致。如果HDFS中存在大批量的小文件,则需要使用CombineFileInputFormat将多个文件打包到一个分片中,以便mapper可以处理更多的数据。一个可以减少大量小文件的方法(适合于小文件在本地文件系统,在上传至HDFS之前将它们合并成大文件)是使用SequenceFile将小文件合并成一个或多个大文件,可以将文件名作为键,文件内容作为值。

有时候不希望输入文件被切分,只需覆盖InputFormat的isSplitable()方法返回false即可。

有时候map程序想知道正在处理的分片信息,可以通过Configuration中的属性得到,包括map.input.file(正在处理的输入文件的路径),map.input.start(分片开始处的字节偏移量),map.input.length(分片的字节长度)。

有时候map想访问一个文件的所有内容,需要一个RecordReader来读取文件内容作为record的值。可行的方法是实现一个FileInputFormat的子类,将文件标记为不可切分,同时指定一个特定的RecordReader;该RecordReader只是在第一次next()时返回文件的内容。

2.3文本输入

TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量;值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。由于一行的长度不定,所以极易出现split分片会跨越HDFS的数据块。

KeyValueTextInputFormat将文件的每一行看作一个键值对,使用某个分界符进行分隔,比如制表符。Hadoop默认输出的TextOutputFormat格式即键值对为一行组成一个文件,处理这类文件就可以使用键值文本输入格式。

NLineInputFormat可以保证map收到固定行数的输入分片,键是文件中行的字节偏移量,值是行内容。默认为1,即一行为一个分片,送给每个map。

2.4二进制输入

SequenceFileInputFormat存储二进制的键值对的序列。顺序文件SequenceFile是可分割的,也支持压缩,很符合MapReduce数据的格式。

2.5多种输入

Hadoop也支持在一个作业中对不同的数据集进行连接(join),即定义多个不同的数据输入源,每个源对应不同的目录、输入格式和Map函数。

MultipleInputs.addInputpath(conf,inputPath,TextInputFormat.class,MaxTemperatureMapper.class);

2.6数据库输入和输出

DBInputFormat用于使用JDBC从关系数据库中读取数据,但只适合少量的数据集。如果需要与来自HDFS的大数据集连接,要使用MultipleInputs。

在关系数据库和HDFS之间移动数据的另一个方法是Sqoop。

HBase和HDFS之间移动数据使用TableInputFormat和TableOutputFormat。

3.输出格式

TextOutputFormat是默认的输出格式,它把每条记录写为文本行,键和值可以是任意类型。

SequenceFileOutputFormat将输出写入一个顺序文件,是二进制格式。MapFileOutputFormat把MapFile作为输出,键必须顺序添加,所以必须确保reducer输出的键已经排好序。

FileOutputFormat及其子类产生的文件放在输出目录下,每个reducer一个文件并且文件由分区号命名,如part-00000,part-00001等。有时候需要对文件名进行控制,或让每个reduce输出多个文件,则可使用MultipleOutputFormat和MultipleOutputs类。

MultipleFileOuputFormat可以将数据写到多个文件,关键是如何控制输出文件的命名。它有两个子类:MultipleTextOutputFormat和MultipleSequenceFileOutputFormat。在使用多文件输出时,只需实现它们任何一个的子类,并覆盖generateFileNameForKeyValue()返回输出文件名。

MultipleOutputs类不同的是,可以为不同的输出产生不同的类型。

MultipleOutputs.addMultiNameOutput(conf,“name”,TextOutputFormat.class,KeyClass,valueClass);

新版本Hadoop中上述两个多输出类也合并。

FileOutputFormat的子类会产生输出文件,即使文件是空的。可以使用LazyOutputFormat来去除空文件。
MapReduce的特性

这章主要总结MapReduce的高级特性,包括计数器,数据集的排序和连接。

1.计数器

计数器是一种收集作业统计信息的有效手段,由于质量控制或应用统计。计数器还可辅助诊断系统故障。

Hadoop为每个作业维护若干内置计数器,以描述该作业的各项指标。计数器由关联任务维护,并定期(3秒)传到tasktracker,再由tasktracker传给jobtracker(5秒,心跳)。一个任务的计数器值每次都是完整传输的,而非增量值。

MapReduce允许用户编写程序定义计数器,一般是由一个Java枚举(enum)类型定义。枚举类型的名称即计数器组名称,枚举类型的字段即计数器名称。计数器在作业实例级别是全局的,MapReduce框架会跨所有的map和reduce来统计这些计数器,并在作业结束时产生一个最终的结果。

enumTemperature{

MISSING,MAlFORMED

}

context.incrCounter(Temperature.MISSING,1);

MapReduce同时支持非枚举类型的动态计数器。

context.incrContext(Stringgroup,Stringcounter,intamount);

计数器可以通过很多方式获取,Web界面和命令行(hadoopjob-counter指令)之外,用户可以用JavaAPI获取计数器的值。

RunningJobjob=jobClient.getJob(JobID.forName(id));

Counterscounters=job.getCounters();

longmissing=counters.getCounter(MaxTemperatue.Temperature.MISSING);

2.排序

排序是MapReduce的核心技术,尽管应用程序本身不需要对数据排序,但可以使用MapReduce的排序功能来组织数据。默认情况下,MapReduce根据输入记录的键对数据排序。键的排列顺序是由RawComparator控制的,规则如下:

1)若属性mapred.output.key.comparator.class已设置,则使用该类的实例;

2)否则键必须是WritableComparable的子类,并使用针对该键类的已登记的comparator;

3)如果还没有已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。

全排序

如何用Hadoop产生一个键全局排序的文件?(最好的回答是使用Pig或Hive,两者均可使用一条指令进行排序)

大致方法是,想办法创建一系列排好序的文件,而且这些文件直接也是排序的,比方说第一个文件的值都不第二个文件的值小,则简单的拼装这些文件就可以得到全局排序的结果。问题是如何划分这些文件,并把原始文件的值放入这些排序的文件中?可以使用map的partition来将某一范围的键放入对于的reduce,每个reduce的输入可以保证已排序(局部排序),默认直接输出到part-000×,那所有这些输出组合成一个文件就是全局排序的。为了得到合适的范围,需要对所有输入数据进行统计,实际做法是通过抽样,Hadoop提供InputSampler和IntervalSampler。使用抽样函数事先对input数据进行抽样,得到抽样范围,然后将范围写入分布式缓存,供集群上其它任务使用。

DistributedCache.addCacheFile(cacheFile,conf);

DistributedCache.createSymlink(conf);

辅助排序

MapReduce框架在记录达到reducer之前按键对记录排序,但键所对应的值并没有排序。大多情况下不需考虑值在reduce函数中的出现顺序,但是,有时也需要通过对键进行排序和分组等以实现对值的排序。

例子:设计一个MapReduce程序以计算每年最高气温。

1)使用组合键IntPair,将年份和气温都作为键;

2)按照年份来分区和分组,但排序需要按照年份升序和气温降序。

conf.setPartitionerClass();

conf.setOutputKeyComparatorClass();

conf.setOutputValueGroupingComparator();

3连接

MapReduce能执行大型数据集间的“连接”操作。

Map端连接指在数据到达map函数之前就执行连接操作。为达到此目的,各map的输入数据必须先分区并且以特定方式排序。各个数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。

map连接操作可以连接多个作业的输出,只要这些作业的reduce数量相同,键相同,并且输出文件是不可切分的(如小于HDFS块大小,或gzip压缩)。利用org.apache.mapred.join包中的CompositeInputFormat类来运行一个map端连接,其输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置。

Reduce连接不要求数据集符合特定结构,因此比Map连接更为常用。但是,由于数据集均经过mapReduce的shuffle过程,所以reduce端连接的效率往往更低一些。基本思路是mapper为各个记录标记源,并且使用连接键作为map输出键,使键相同的记录放在同一个reducer中。

1)可以使用MultipleInputs来解析和标注各个源;

2)先将某一个数据源传输到reduce。举天气数据为例,气象站信息(气象站id和名字)以气象站ID+“0”为组合键,名字为值,但是按照ID来分区和分组;气象站天气情况(气象站id,时间和气温)以气象站ID+“1”为组合键,气温为值,但是按照ID来分区和分组。两组数据经过不同的map之后,具有相同的ID的记录被合并作为一个记录输入reduce程序,值列表中的第一个是气象站名称,其余的记录都是温度信息。reduce程序只需要取出一个值,并将其作为后续每条输出记录的一部分写到输出文件即可。

conf.setPartitionerClass();

conf.setOutputValueGroupingComparator(Textpair.FirstComparator.class);

4边数据分布(sidedata)

边数据是作业所需的额外的只读数据,已辅助处理主数据集。面临的挑战是如何让所有的map和reduce都能方便高效地使用边数据。

1)如果仅需向任务传递少量元数据,则可以通过Configuration来设置每个job的属性,则map/reduce可以覆盖configure()方法来获取这些元数据值。如果你设置的值是复杂对象,则需要处理序列化工作。在几百个作业同在一个系统中运行的情况下,这种方法会增多内存开销,而且元数据信息在所有节点都缓存,即使在不需要它的jobtracker和tasktracker上。

2)针对小数据量边数据的常用办法是将在map/reduce数据缓存在内存中,并通过重用JVM使tasktracker上同一个作业的后续任务共享这些数据。

3)分布式缓存(-files,-archives)

a)启动作业时,使用files或archives传入元数据文件路径,

%hadoopjarjob.jarMaxTempratureSample–fileinput/metadata/stations-fixed-width.txtinput/alloutput

b)当tasktracker获得任务后,首先将jobtracker中的上述文件复制到本地磁盘,具体在${mapred.local.dir}/taskTracker/archive,缓存的容量是有限的,默认10GB,可以通过local.cache.size来设置。

c)在map/reduce程序中,直接读取“stations-fixed-width.txt”文件。同时可以通过JobConf.getLocalCacheFiles()和JobConf.getLocalCacheArchives()来获取本地文件路径的数组。

5MapReduce类库

Hadoop还提供了一个MapReduce类库,方便完成常用的功能。

ChainMapper,ChainReducer在一个MapReduce中运行多个mapper或reducer。(M+RM*)

IntSumReducer,LongSumReducer对各键的所有整数值进行求和操作的reducer

TokenCounterMapper输出各单词及其出现的次数

RegexMapper检查输入值是否匹配某正则表达式,输出匹配字符串和计数器值

分享到:
评论

相关推荐

    3.Hadoop学习笔记.pdf

    hadoop学习笔记,hadoop简介,适用于hadoop入门,讲解hadoop安装,使用,基本原理,大数据,分布式等概念

    Hadoop 学习笔记.md

    Hadoop 学习笔记.md

    最新Hadoop学习笔记

    hadoop 学习笔记,从搭建环境开始到具体实验。包括hdfs配置,yarn配置,分布式配置,如何编写mapreuduce 一步一步手把手,最后项目是hadoop 与 javaweb

    hadoop学习笔记

    我学习hadoop的笔记,并在公司做的报告,给大家共享下

    Hadoop学习笔记整理

    Hadoop学习笔记

    hadoop学习笔记.rar

    hadoop学习笔记.rarhadoop学习笔记.rarhadoop学习笔记.rarhadoop学习笔记.rarhadoop学习笔记.rarhadoop学习笔记.rarhadoop学习笔记.rar

    Hadoop学习笔记AAAAAAAAAAA

    Hadoop学习笔记AAAAAAAAAAA

    HADOOP学习笔记

    云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建云计算平台的搭建

    云计算hadoop学习笔记

    云计算,hadoop,学习笔记, dd

    hadoop学习笔记(三)

    踏入hadoop的世界,一个不一样的世界

    Hadoop学习笔记.pdf

    Hadoop 适合初学者 Hadoop2.0 hbase 什么时候用HBase: 确信有足够多数据 确信可以不依赖所有RDBMS的额外特性(列数据类型/第二索引/事物/高级查询语言) 确信有足够硬件

Global site tag (gtag.js) - Google Analytics