Hadoop异步rpc通信机制--org.apache.hadoop.ipc.Server
Hadoop异步rpc通信机制--org.apache.hadoop.ipc.Server
Java NOI非阻塞技术不是开启线程去等待端口的响应,而是采用Reactor模式或Observer模式监听I/O端口,当端口有响应时,会自动通知我们,从而实现流畅的I/O读写。
Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。
Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:
Java NIO参考资料:
Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
Server.Call是一个请求类,类似Client.Call,只是添加了Call的时间戳机制:
private static class Call {
private int id; // 请求id
private Writable param; // 请求的参数
private Connection connection; // 和Client一样,表示一个C/S间的连接
private long timestamp; // 时间戳
private ByteBuffer response; // server对此次请求的响应结果
...
}
知道了Client.Connection后,显然Server.Connection就是Server到Client的连接。Server.Connection内保存了Client的地址,用于灾难恢复。Server.Connection通过调用readAndProcess对Client进行一些操作:版本校验,读数据头processHeader(获取通信协议protocol,根据头部的ugi信息创建user对象)以及读数据processData(获取Client发送过来的Call.id和params,根据二者建立一个请求call,并将请求call入队callQueue)【readAndProcess方法是在Listener.doRead时调用,此时监听器监听到新连接的读数据事件】。
|
评论暂时关闭