`
cocoIT
  • 浏览: 48754 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

Hadoop,MapReduce 操作 Mysql

 
阅读更多
MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。

hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle等几个数据库系统。

1.从Mysql读出数据

Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db中。DBInputFormat在Hadoop应用程序中通过数据库供应商提供的JDBC接口来与数据库进行交互,并且可以使用标准的SQL来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

在使用DBInputFormat之前,必须将要使用的JDBC驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。

2.MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html


DBInputFormat类中包含以下三个内置类
1.protectedclassDBRecordReaderimplementsRecordReader<LongWritable,T>:用来从一张数据库表中读取一条条元组记录。
2.2.publicstaticclassNullDBWritableimplementsDBWritable,Writable:主要用来实现DBWritable接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:

publicvoidwrite(PreparedStatementstatement)throwsSQLException;
publicvoidreadFields(ResultSetresultSet)throwsSQLException;
3.protectedstaticclassDBInputSplitimplementsInputSplit:主要用来描述输入元组集合的范围,包括start和end两个属性,start用来表示第一条记录的索引号,end表示最后一条记录的索引号.


下面对怎样使用DBInputFormat读取数据库记录进行详细的介绍,具体步骤如下:1.
DBConfiguration.configureDB(JobConfjob,StringdriverClass,StringdbUrl,StringuserName,Stringpasswd)函数,配置JDBC驱动,数据源,以及数据库访问的用户名和密码。MySQL数据库的JDBC的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。

2.
DBInputFormat.setInput(JobConfjob,Class<?extendsDBWritable>inputClass,StringtableName,Stringconditions,StringorderBy,String...fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,stringtableName表名,conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConfjob,Class<?extendsDBWritable>inputClass,StringinputQuery,StringinputCountQuery)。

3.编写MapReduce函数,包括Mapper类、Reducer类、输入输出文件格式等,然后调用JobClient.runJob(conf)。

上面讲了理论,下面举个例子:假设MySQL数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。

第一步要实现DBwrite和write数据接口。代码如下:

publicclassStudentRecordimplementsWritable,DBWritable{

intid;

Stringname;

Stringgender;

Stringnumber;
@Override
publicvoidreadFields(DataInputin)throwsIOException{

//TODOAuto-generatedmethodstub

this.id=in.readInt();
this.gender=Text.readString(in);

this.name=in.readString();

this.number=in.readString();

}
@Override
publicvoidwrite(DataOutputout)throwsIOException{

//TODOAuto-generatedmethodstub

out.writeInt(this.id);
Text.writeString(out,this.name);

out.writeInt(this.gender);
out.writeInt(this.number);
}
@Override
publicvoidreadFields(ResultSetresult)throwsSQLException{

//TODOAuto-generatedmethodstub

this.id=result.getInt(1);

this.name=result.getString(2);

this.gender=result.getString(3);

this.number=result.getString(4);

}
@Override
publicvoidwrite(PreparedStatementstmt)throwsSQLException{

//TODOAuto-generatedmethodstub

stmt.setInt(1,this.id);

stmt.setString(2,this.name);

stmt.setString(3,this.gender);

stmt.setString(4,this.number);

}
@Override
publicStringtoString(){
//TODOAuto-generatedmethodstub
returnnewString(this.name+""+this.gender+""+this.number);

}

第二步,实现Map和Reduce类

publicclassDBAccessMapperextendsMapReduceBaseimplementsMapper<LongWritable,TeacherRecord,LongWritable,Text>{

@Override

publicvoidmap(LongWritablekey,TeacherRecordvalue,
OutputCollector<LongWritable,Text>collector,Reporterreporter)throwsIOException{
//TODOAuto-generatedmethodstub

newcollector.collect(newLongWritable(value.id),newText(value.toString()));

}

}


第三步:主函数的实现,函数
publicclassDBAccessReader{

publicstaticvoidmain(String[]args)throwsIOException{

JobConfconf=newJobConf(DBAccessReader.class);

conf.setOutputKeyClass(LongWritable.class);

conf.setOutputValueClass(Text.class);

conf.setInputFormat(DBInputFormat.class);

FileOutputFormat.setOutputPath(conf,newPath("dboutput"));

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost/school","root","123456");

String[]fields={"id","name","gender","number"};

DBInputFormat.setInput(conf,StudentRecord.class,"Student",null"id",fields);

conf.setMapperClass(DBAccessMapper.class);

conf.setReducerClass(IdentityReducer.class);

JobClient.runJob(conf);

}

}


2.写数据


往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。
1.DBOutFormat:提供数据库写入接口。
2.DBRecordWriter:提供向数据库中写入的数据记录的接口。
3.DBConfiguration:提供数据库配置和创建链接的接口。
DBOutFormat提供一个静态方法setOutput(job,Stringtable,String...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为

publicstaticvoidmain(String[]args)throwsIOException{Configurationconf=newConfiguration();JobConfconf=newJobConf();
conf.setOutputFormat(DBOutputFormat.class);

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost/school","root","123456");

DBOutputFormat.setOutput(conf,"Student",456,"liqizhou","man","20004154578");

JobClient.runJob(conf);


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics