MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。
hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle等几个数据库系统。
1.从Mysql读出数据
Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db中。DBInputFormat在Hadoop应用程序中通过数据库供应商提供的JDBC接口来与数据库进行交互,并且可以使用标准的SQL来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。
在使用DBInputFormat之前,必须将要使用的JDBC驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
2.MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html
DBInputFormat类中包含以下三个内置类
1.protectedclassDBRecordReaderimplementsRecordReader<LongWritable,T>:用来从一张数据库表中读取一条条元组记录。
2.2.publicstaticclassNullDBWritableimplementsDBWritable,Writable:主要用来实现DBWritable接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:
publicvoidwrite(PreparedStatementstatement)throwsSQLException;
publicvoidreadFields(ResultSetresultSet)throwsSQLException;
3.protectedstaticclassDBInputSplitimplementsInputSplit:主要用来描述输入元组集合的范围,包括start和end两个属性,start用来表示第一条记录的索引号,end表示最后一条记录的索引号.
下面对怎样使用DBInputFormat读取数据库记录进行详细的介绍,具体步骤如下:1.
DBConfiguration.configureDB(JobConfjob,StringdriverClass,StringdbUrl,StringuserName,Stringpasswd)函数,配置JDBC驱动,数据源,以及数据库访问的用户名和密码。MySQL数据库的JDBC的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。
2.
DBInputFormat.setInput(JobConfjob,Class<?extendsDBWritable>inputClass,StringtableName,Stringconditions,StringorderBy,String...fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,stringtableName表名,conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConfjob,Class<?extendsDBWritable>inputClass,StringinputQuery,StringinputCountQuery)。
3.编写MapReduce函数,包括Mapper类、Reducer类、输入输出文件格式等,然后调用JobClient.runJob(conf)。
上面讲了理论,下面举个例子:假设MySQL数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。
第一步要实现DBwrite和write数据接口。代码如下:
publicclassStudentRecordimplementsWritable,DBWritable{
intid;
Stringname;
Stringgender;
Stringnumber;
@Override
publicvoidreadFields(DataInputin)throwsIOException{
//TODOAuto-generatedmethodstub
this.id=in.readInt();
this.gender=Text.readString(in);
this.name=in.readString();
this.number=in.readString();
}
@Override
publicvoidwrite(DataOutputout)throwsIOException{
//TODOAuto-generatedmethodstub
out.writeInt(this.id);
Text.writeString(out,this.name);
out.writeInt(this.gender);
out.writeInt(this.number);
}
@Override
publicvoidreadFields(ResultSetresult)throwsSQLException{
//TODOAuto-generatedmethodstub
this.id=result.getInt(1);
this.name=result.getString(2);
this.gender=result.getString(3);
this.number=result.getString(4);
}
@Override
publicvoidwrite(PreparedStatementstmt)throwsSQLException{
//TODOAuto-generatedmethodstub
stmt.setInt(1,this.id);
stmt.setString(2,this.name);
stmt.setString(3,this.gender);
stmt.setString(4,this.number);
}
@Override
publicStringtoString(){
//TODOAuto-generatedmethodstub
returnnewString(this.name+""+this.gender+""+this.number);
}
第二步,实现Map和Reduce类
publicclassDBAccessMapperextendsMapReduceBaseimplementsMapper<LongWritable,TeacherRecord,LongWritable,Text>{
@Override
publicvoidmap(LongWritablekey,TeacherRecordvalue,
OutputCollector<LongWritable,Text>collector,Reporterreporter)throwsIOException{
//TODOAuto-generatedmethodstub
newcollector.collect(newLongWritable(value.id),newText(value.toString()));
}
}
第三步:主函数的实现,函数
publicclassDBAccessReader{
publicstaticvoidmain(String[]args)throwsIOException{
JobConfconf=newJobConf(DBAccessReader.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf,newPath("dboutput"));
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost/school","root","123456");
String[]fields={"id","name","gender","number"};
DBInputFormat.setInput(conf,StudentRecord.class,"Student",null"id",fields);
conf.setMapperClass(DBAccessMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);
}
}
2.写数据
往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。
1.DBOutFormat:提供数据库写入接口。
2.DBRecordWriter:提供向数据库中写入的数据记录的接口。
3.DBConfiguration:提供数据库配置和创建链接的接口。
DBOutFormat提供一个静态方法setOutput(job,Stringtable,String...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为
publicstaticvoidmain(String[]args)throwsIOException{Configurationconf=newConfiguration();JobConfconf=newJobConf();
conf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost/school","root","123456");
DBOutputFormat.setOutput(conf,"Student",456,"liqizhou","man","20004154578");
JobClient.runJob(conf);
分享到:
相关推荐
基于mapreduce的小型电影推荐系统,使用javaweb的方式实现,
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
hadoop连接数据库查询数据,并添加到hdfs;从hdfs进行mapreduce数据导入到数据库 hadoop连接数据库查询数据,并添加到hdfs;从hdfs进行mapreduce数据导入到数据库 hadoop连接数据库查询数据,并添加到hdfs;从hdfs...
Hadoop集群·MapReduce初级案例(第9期) Hadoop集群·MySQL关系数据库(第10期) Hadoop集群·常用MySQL数据库命令(第10期副刊) Hadoop集群·HBase简介及安装(第11期) Hadoop集群·HBase之旅(第11期副刊)
======== 销售点概念验证使用 Hadoop MapReduce、Hive 和 Luigi 处理事务日志的概念证明。 'pos' 包包含处理销售点交易日志的所有代码。 计算每个用户最受欢迎的产品类别,以及每个用户每季度的收入。 preprocessing...
Hadoop_MapReduce_DataBase 注意:使用的数据库 - MySQL 该存储库包含将其输出写入 MySQL 数据库的 MapReduce 程序的源代码。 如果您正在写入远程数据库(即如果您在多节点集群上运行此程序),请确保每个从节点上...
MapReduceSkeleton介绍这是一个 MapReduce Skeleton 作业,支持 MariaDB/MySQL(和即将推出的 HBase)作为输入数据集和输出。 SQL配置在Main.java中,如果使用UPDATE方式,需要修改MysqlDBOutputFormat.java中的...
Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2...Hadoop_Hadoop集群(第9期)_MapReduce初级案例 Hadoop_Hadoop集群(第10期)_MySQL关系数据库 Web(Json-Lib类库使用手册)
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
本次要实践的数据日志来源于国内某技术学习论坛,该论坛由某培训机构主办,汇聚...使用Sqoop把Hive产生的统计结果导出到mysql中; 两个日志文件,一共有200MB,符合大数据量级,可以作为推荐系统数据集和hadoop测试集。
技术点5 使用Sqoop 从MySQL 导入数据 2.2.4 HBase 技术点6 HBase 导入HDFS 技术点7 将HBase 作为MapReduce 的数据源2.3 将数据导出Hadoop 2.3.1 将数据导入本地文件系统技术点8 自动复制HDFS 中的文件...
hadoop-env.sh 环境变量13#Hadoop Common组件 配置 core-site.xml13#HDFS NameNode,DataNode组建配置 hdfs-site.xml14#配置MapReduce - JobTracker TaskTracker 启动配置15#Hadoop单机系统,启动执行和异常检查17#...
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,...
大数据平台:Spark(与IPython Notebook集成)和MapReduce 云服务:Amazon Web Services(Boto,AWS CLI,S3cmd等)和Heroku 通用数据存储:MySQL,PostgreSQL,MongoDB,Redis和Elasticsearch Javascript Web...
This book is a practical guide on using the Apache Hadoop projects including MapReduce, HDFS, Apache Hive, Apache HBase, Apache Kafka, Apache Mahout and Apache Solr. From setting up the environment to...
Sqoop中一大亮点就是可以通过hadoop的mapreduce把数据从关系型数据库中导入数据到HDFS。 一、安装sqoop 1、下载sqoop压缩包,并解压 压缩包分别是:sqoop-1.2.0-CDH3B4.tar.gz,hadoop-0.20.2-CDH3B4.tar.gz, ...
HADOOP 安装配置实践手册 0 Linux 基础 1 Hadoop 安装配置 2 HDFS 编程 3 MYSQL 4 HIVE 5 Sqoop 6 Storm 7 Kafka 8 RDS 9 词云
该文档是虾皮老师(博客园)自己撰写的,觉得很好就分享给大家。这个对于入门学习Hadoop是非常有帮助的。里面的文字简单易懂。 文档列表: 1)Hadoop集群_第1期_... 10)Hadoop集群_第10期_MySQL关系数据库_V1.0
18、MapReduce的计数器与通过MapReduce读取_写入数据库示例 ...本文介绍MapReduce的计数器使用以及...本文的前提依赖是hadoop可正常使用、mysql数据库中的表可用且有数据。 本文分为2个部分,即计数器与读写mysql数据库。
本项目以电影数据为主题,基于hadoop伪分布式搭建,结合hive数据仓库调用物理机mysql数据库实现电影相关数据统计、通过Mapreduce编程对hdfs文件系统的文件进行词频统计。使用python进行电影数据采集、处理、分析及...