MapReduce(十六): 写数据到HDFS的源码分析,mapreducehdfs



1)   LineRecordWriter负责把Key,Value的形式把数据写入到DFSOutputStream


2)   DFSOutputStream负责把LineRecordWriter写入的数据发送到Datanode中。对LineRecordWriter写入的数据首先按照一个个chunk分割,然后打包成Packet发送给datanode,datanode负责对接收的数据备份到其它datanode上。发送数据时,发送线程和结果接收线程分开,对发送队列的数据完成发送后,转移的结果接收队列中,等待datanode对数据保存,备份是否正确的应答,如果datanode保存失败,则对datanode,包括备份datanode的数据恢复到失败的前一刻,然后继续向datanode发送数据保存。

把LineRecordWriter中写入的数据先放到缓存中: 


Buffer满了后,就把数据按照chunk封装到packet中,packet写满后就把packet放到发送队列中,等待发送线程发送给datanode



3)   DataStreamer负责把队列中放入packet的数据发送到datanode上

 



4)   ReponseProcessor负责对发送的packet数据接收datanode处理应答


5)   Block传输写协议



问在Hadoop的HDFS中,是怎把文件分割后的block分散到一个个的datanode中,有源代码的相关方法说明最好

(1)文件分割后,会有一个 文件 --> block的映射,这个映射是持久化到硬盘中的,具体的映射关系表是在FSNamesystem.java中构建的(该部分的构建使用的是FSDirectory.java的功能,filename - blockset);
有了文件到块的映射表就可以通过文件找到blocklist;
(2)datanode的选取,hadoop有它本身的机制,一般来说,datanode默认是三个,选取的是不同机架的datanode,同机架里选一台,另一个机架里选取两台(安全性等考虑);
(3)block写入datanodes,选取的三个datanode,比如说是A、B、C,先写给A,A再写给B,B再写给C;然后B收到C的写入成功,A收到B的写入成功,然后告诉namenode 和 client写入成功;
(4)真正写的并不是block,而是比block更小的好像是chunk , 还包括有各种校验。

给你参考一下。
 

怎在代码中提交Mapreduce作业?

MapReduce作业提交源码分析
我们在编写MapReduce程序的时候,首先需要编写Map函数和Reduce函数。完成mapper和reducer的编写后,进行Job的配置;Job配置完成后,调用Job.submit()方法完成作业的提交。那我们思考一下,Job最终如何完成作业(job)的提交呢?粗略想一下,Job必然需要通过某种方式连接到JobTracker,因为只有这样才能将job提交到JobTracker上进行调度执行。还需要考虑一下,我们自己编写的mapper和reducer,即Jar文件如何传送到JobTracker上呢?其中有一种最简单也比较直观的方法,直接通过socket传输给JobTracker,由JobTracker再传输给TaskTracker(注意:MapReduce并没有采用这种方法)。第三个需要考虑的内容是,JobTracker如何将用户作业的配置转化成map task和reduce task。下面我们来分析一下MapReduce这些功能的实现。
首先在class Job内部通过JobClient完成作业的提交,最终由JobClient完成与JobTracker的交互功能。在JobClient的构造函数中,通过调用RPC完成与JobTracker连接的建立。
完成建立后,JobClient首先确定job相关文件的存放位置(我们上面提到mapreduce没有采用将jar即其他文件传输给JobTracker的方式,而是将这些文件保存到HDFS当中,并且可以根据用户的配置存放多份)。至于该存放目录的分配是通过调用RPC访问JobTracker的方法来进行分配的,下面看一下JobTracker的分配代码:
final Path stagingRootDir = new Path(conf.get(
"mapreduce.jobtracker.staging.root.dir",
"/tmp/Hadoop/mapred/staging"));
final FileSystem fs = stagingRootDir.getFileSystem(conf);
return fs.makeQualified(new Path(stagingRootDir, user + "/.staging")).toString();

注意上面代码所生成的stagingRootDir是所有job文件的存放目录,是一个根目录,并不单指当前job。
完成job存放目录的分配后,JobClient向JobTracker申请一个JobID(通过RPC,注意基本上JobClient与JobTracker的所有通信都是通过RPC完成的,如果下文没有显示著名也应该属于这种情况)。
JobID jobId = jobSubmitClient.getNewJobId();
下面是JobTracker.getNewJobId的具体实现:
publicsynchronized JobID getNewJobId() throws IOException {
returnnew JobID(getTrackerIdentifier(), nextJobId++);
}
获得JobID后,将该JobID与上面的stagingRootDir组合就构成了Job文件的具体存放地址的构建。进行这些相关工作后,JobClient将相关的文件存储到HDFS......余下全文>>
 

相关内容