InputFormat,它描述了一个MapReduce Job的输入,通过InputFormat,Hadoop可以:
l检查MapReduce输入数据的正确性;
l将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;
l提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的<K,V>对。
在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:
lmapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;
lmapred.min.split.size:最小的划分大小;
lmapred.max.split.size:最大的划分大小;
lmapred.input.dir:输入路径,用逗号做分割。
类中比较重要的方法有:
protectedList<FileStatus> listStatus(Configuration job)
递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。
publicList<InputSplit> getSplits(JobContext context)
将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划分。注意,划分不会跨越文件。
FileInputFormat没有实现InputFormat的createRecordReader方法。
FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/io/SequenceFile.html),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createRecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。
FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名,起始偏移量,划分的长度和可能的目标机器)已经足以说明这个类的功能。
RecordReader用于在划分中读取<Key,Value>对。RecordReader有五个虚方法,分别是:
linitialize:初始化,输入参数包括该Reader工作的数据划分InputSplit和Job的上下文context;
lnextKey:得到输入的下一个Key,如果数据划分已经没有新的记录,返回空;
lnextValue:得到Key对应的Value,必须在调用nextKey后调用;
lgetProgress:得到现在的进度;
lclose,来自java.io的Closeable接口,用于清理RecordReader。
我们以LineRecordReader为例,来分析RecordReader的构成。前面我们已经分析过FileInputFormat对文件的划分了,划分完的Split包括了文件名,起始偏移量,划分的长度。由于文件是文本文件,LineRecordReader的初始化方法initialize会创建一个基于行的读取对象LineReader(定义在org.apache.hadoop.util中,我们就不分析啦),然后跳过输入的最开始的部分(只在Split的起始偏移量不为0的情况下进行,这时最开始的部分可能是上一个Split的最后一行的一部分)。nextKey的处理很简单,它使用当前的偏移量作为Key,nextValue当然就是偏移量开始的那一行了(如果行很长,可能出现截断)。进度getProgress和close都很简单。
分享到:
相关推荐
Hadoop_Genomic_Analysis 这是一个 Hadoop 版本的 HiC 数据基因组分析工具,目的是将 HiC 内部相互作用映射到基因,然后获得基因-基因相互作用网络并进行进一步研究。 请参阅 Spark 版本以获取更多详细信息。 如何...
maven编译pentaho-big-data-plugin遇到的所有问题解决,pentaho6.0
Javaweb课程作业基于Hadoop的中文词频统计工具源码+使用说明.zip 一、Linux下配置hadoop集群(伪分布式或完全分布式) 1、https://blog.csdn.net/z1148059382/article/details/89459182 2、Windows下管理HDFS的神器...
java连接sqoop源码CSV -> 镶木地板 概述 此存储库包含将 CSV 数据转换为 Parquet 格式的概念证明。 它使用 Python 的pandas库来生成 Parquet 文件,从而避免了运行 Hadoop 的需要。 用法 与码头工人: 首先,构建一...
医院挂号系统源码(含数据库) 本系统经过多年应⽤并且持续不断改进,系统各部功能已基本完善,⾮常适合⼤型医疗集团,当然⼩型医院或科室就更适合了。 ⽂件:(访问密码:551685) 以下内容⽆关: ----------------...
源码 Daily 主要日常学习得一些例子 └─src ├─main │ ├─java │ │ └─com │ │ └─skrein │ │ ├─aixcoder # idea aixcoder插件 │ │ ├─bitmap # bitmap位图算法 │ │ ├─collection │ │ ├─...
java8流源码#DataStream 通过函数式编程为大数据提供动力 大数据总是很棒。 但是使用mapper、reducer组合器编写hadoop很糟糕。 为什么我们不让我们的分布式 hadoop 生活更轻松,并将权力交还给 java 开发人员? 受...
本次实践项目利用 Hadoop 的分布式计算框架 MapReduce 来分析用户行为数据,计算得出商品点击排行、商品分类占比等统计指标,使得更加熟练掌握 MapReduce 程序的设计。 相关理论知识有,Map 和 Reduce 流程图如下:...
简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。RDD的全名是ResilientDistributedDataset,意思是容错的分布式数据集,每一个RDD都会有5个特征:1、有一个分片列表。就是能被切分,和hadoop一样...
PageRank算法的实现网页排名这是使用 Java 实现的 PageRank 算法驱动程序程序的 Main() 函数位于此文件中。 执行从这里开始。... 阻尼系数 (0.15)例子: hdfs dfs -put sample-small.txt /input hadoop jar pageran
INPUT_DIR的路径* OUTPUT_DIR的路径将克隆的存储库导入Intellij,然后在application.conf文件中的不同可用Map Reduce Jobs之间进行选择。 Top_Authors_Published_Venue Author_Most_Coauthor Author_No_Coauthor ...