mapreduce job所需要的各种参数在Sqoop中的实现,mapreducesqoop


1) InputFormatClass com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat 2) OutputFormatClass1)TextFile com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat 2)SequenceFile org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat 3)AvroDataFile com.cloudera.sqoop.mapreduce.AvroOutputFormat 3)Mapper1)TextFile com.cloudera.sqoop.mapreduce.TextImportMapper                2)SequenceFile com.cloudera.sqoop.mapreduce.SequenceFileImportMapper       3)AvroDataFile com.cloudera.sqoop.mapreduce.AvroImportMapper 4)taskNumbers 1)mapred.map.tasks(对应num-mappers参数)    2)job.setNumReduceTasks(0); 这里以命令行:import –connectjdbc:mysql://localhost/test  –username root –password 123456 –query“select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1,sqoop_2  WHERE $CONDITIONS” –target-dir /user/sqoop/test -split-bysqoop_1.id   –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3  –num-mappers2 注:红色部分参数,后接根据命令衍生的参数值 1)设置Input DataDrivenImportJob.configureInputFormat(Jobjob, String tableName,String tableClassName, String splitByCol) a)DBConfiguration.configureDB(Configurationconf, String driverClass,      String dbUrl,String userName, String passwd, Integer fetchSize) 1).mapreduce.jdbc.driver.classcom.mysql.jdbc.Driver 2).mapreduce.jdbc.url  jdbc:mysql://localhost/test             3).mapreduce.jdbc.username  root 4).mapreduce.jdbc.password  123456 5).mapreduce.jdbc.fetchsize -2147483648 b)DataDrivenDBInputFormat.setInput(Jobjob,Class<? extends DBWritable> inputClass, String inputQuery, StringinputBoundingQuery) 1)job.setInputFormatClass(DBInputFormat.class);                2)mapred.jdbc.input.bounding.querySELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id,sqoop_2.id as bar_id from sqoop_1 ,sqoop_2  WHERE  (1 = 1)) AS t1 3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class); 4)mapreduce.jdbc.input.orderbysqoop_1.id c)mapreduce.jdbc.input.class QueryResult d)sqoop.inline.lob.length.max 16777216 2)设置Output ImportJobBase.configureOutputFormat(Jobjob, String tableName,String tableClassName) a)job.setOutputFormatClass(getOutputFormatClass());              b)FileOutputFormat.setOutputCompressorClass(job, codecClass); c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK); d)FileOutputFormat.setOutputPath(job,outputPath); 3)设置Map DataDrivenImportJob.configureMapper(Job job,String tableName,String tableClassName)     a)job.setOutputKeyClass(Text.class);
     b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper); 4)设置task number JobBase.configureNumTasks(Job job) mapred.map.tasks 4 job.setNumReduceTasks(0); 更多精彩内容请关注:http://bbs.superwu.cn 关注超人学院微信二维码:

相关内容