Hadoop-06-RPC机制以及HDFS源码分析,hadoop-06-rpchdfs


1.RPC机制

1.1.概述

RPC——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

1.2.调用过程

 

1.3.代码实现

package mavshuang.rpc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyBizable extends VersionedProtocol {
	public static final long VERSION = 123456L;
	public abstract String hello(String name);
}

package mavshuang.rpc;import java.io.IOException;
public class MyBiz implements MyBizable {
public long getProtocolVersion(String protocol , long clientVersion) throws IOException {
		return MyBizable.VERSION;
	}
	public String hello(String name) {
		System.out.println("我被调用了!");
		return "hello" +name;
	}
}

package mavshuang.rpc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class MyClient {
	public static void main(String[] args) throws Exception {
		final MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class, MyBizable.VERSION, new InetSocketAddress(MyServer.SERVER_IP, MyServer.SERVER_PORT), new Configuration());
		String result = proxy.hello("world,hello mavs!");
		System.out.println("客户端返回的结果:" + result);
		RPC.stopProxy(proxy); // 关闭网络连接
	}
}
说明:RPC.getProxy(),该方法有四个参数,第一个参数是被调用的接口类,第二个是客户端版本号,第三个是服务端地址。返回的代理对象,就是服务端对象的代理,内部就是使用 java.lang.Proxy 实现的。
package mavshuang.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
public class MyServer {
	public static final String SERVER_IP = "localhost";// 指定服务器IP
	public static final int SERVER_PORT = 9999;// 指定服务器端口
	public static void main(String[] args) throws Exception {
		/**
		 * 构造一个 RPC 服务端.
		 * 
		 * @param instance
		 *            实例对象中的方法会被客户端调用
		 * @param bindAddress
		 *            the address to bind on to listen for connection:监听连接的IP地址
		 * @param port
		 *            the port to listen for connections on:监听连接的端口号
		 * @param conf
		 *            the configuration to use:使用的确认协议
		 */
		final Server server = RPC.getServer(new MyBiz(), SERVER_IP, SERVER_PORT, new Configuration());
		server.start();// 启动服务
	}
}
说明:RPC.getServer 方法,该方法有四个参数,第一个参数是被调用的 java对象,第二个参数是服务器的地址,第三个参数是服务器的端口。获得服务器对象后,启动服务器。这样,服务器就在指定端口监听客户端的请求。
运行时,先启动服务端,再启动客户端,运行结果:

 




 
命令行执行 jps 命令,查看输出信息:
 

2.HDFS的分布式存储架构的源码分析

2.1.NameNode的接口分析

 
由上图可以看到NameNode本身就是一个java进程。 观察上图中RPC.getServer()方法的第一个参数,发现是 this,说明 NameNode 本身就是一个位于服务端的被调用对象,即 NameNode 中的方法是可以被客户端代码调用的。根据 RPC 运行原理可知,NameNode暴露给客户端的方法是位于接口中的。

查看 NameNode 的源码,如下图所示 


可以看到 NameNode 实现了 ClientProtocal、DatanodeProtocal、NamenodeProtocal 等接口。下面逐一分析这些接口。
(1)DFSClient调用ClientProtocal
这个接口是供客户端调用的。这里的客户端不是指的自己写的代码,而是hadoop 的一个类叫做 DFSClient。在 DFSClient 中会调用 ClientProtocal 中的方法, 完成一些操作。
该接口中的方法大部分是对 HDFS 的操作,如 create、delete、mkdirs、rename 等。
(2)DataNode调用DatanodeProtocal
这个接口是供 DataNode 调用的。 DataNode 调用该接口中的方法向 NameNode 报告本节点的状态和 block 信息。
NameNode 不能向 DataNode 发送消息,只能通过该接口中方法的返回值向DataNode 传递消息。
(3)SecondaryNameNode调用NamenodeProtocal
这个接口是供 SecondaryNameNode 调用的。SecondaryNameNode 是专门做NameNode 中 edits 文件向 fsimage 合并数据的。

2.2. DataNode的接口分析

按照分析 NameNode 的思路,看一下 DataNode 的源码接口。


 
这里有两个接口,分别是 InterDatanodeProtocal、ClientDatanodeProtocal。


2.3. HDFS的写数据过程分析

通过FileSystem类可以操控HDFS,那么从这里开始分析写数据到HDFS的过程。
在向 HDFS 写文件的时候,调用的是 FileSystem.create(Path path)方法,查看这个方法的源码,通过跟踪内部的重载方法,可以找到如下图所示的调用。

这个方法是抽象类,没有实现。那么只能向他的子类寻找实现。FileSystem 有个子类是 DistributedFileSystem,在的伪分布环境下使用的就是这个类。可以看到
DistributedFileSystem 的这个方法的实现,如图所示。
 
在上图中,注意第 185 行的返回值 FSDataOutputStream。这个返回值对象调用了自己的构造方法, 构造方法的第一个参数是 dfs.create()方法。关注一下这里的 dfs 对象是谁,create 方法做了什么事情。现在进入这个方法的实现,如下图所示。
 

在上图中,返回值正是第 713 行创建的对象。继续查看该方法:


 
在上图中,可 以 看 到,这 个 类 是 DFSClient 的 内 部 类 。 在 类 内 部 通 过 调用namenode.create()方法创建了一个输出流。再看一下 namenode 对象是什么类型,如下图所示:
 

在上图中,可以看到 namenode 其实是 ClientProtocal 接口。那么,这个对象是什么时候创建?如下图所示。


 
 
可以发现,namenode 对象是在 DFSClient 的构造函数调用时创建的,即当 DFSClient 对象存在的时候,namenode 对象已经存在了。
至此,可以看到,使用 FileSystem 对象的 api 操纵 HDFS,其实是通过 DFSClient 对象访问 NameNode 中的方法操纵 HDFS 的。这里的 DFSClient 是 RPC 机制的客户端, NameNode是 RPC 机制的服务端的调用对象,整个调用过程如下图所示。
 
在整个过程中,DFSClient 是个很重要的类,从名称就可以看出,它表示 HDFS 的 Client,是整个 HDFS 的 RPC 机制的客户端部分。对 HDFS 的操作,是通过 FileSsytem 调用的DFSClient 里面的方法。FileSystem 是封装了对 DFSClient 的操作,提供给用户使用的。

2.4. HDFS的读数据过程分析

继续在 FileSystem 类分析,读数据使用的是 open(…)方法,可以看到源码,如下图所示。


 

在上图中,返回的是 DFSClient 类中 DFSDataInputStream 类,显而易见,这是一个内部类。这个内部类的构造函数,有两个形参,第一个参数是 dfs.open(…)创建的对象。看一下方法的源码,如下图所示。


 
在上图的实现中,返回的是一个 DFSInputStream 对象。该对象中含有 NameNode 中的数据块信息。看一下DFSInputStream类的构造方法源码,如下图所示。
 

在上图中,这个构造方法中最重要的语句是第 1834 行,打开信息,从第 1840 行开始是 openInfo()的源代码,截图显示不全。注意第 1841 行,是获取数据块的信息的。查看这一行的源代码,如下图所示。


 从上图中可以看到,获取数据块信息的方法也是通过调用 namenode 取得的。这里的 namenode 属性还是位于 DFSClient 中的。通过前面的分析,已经知道,在DFSClient类中的 namenode 属性是 ClientProtocal。

相关内容