Hadoop异步rpc通信机制--org.apache.hadoop.ipc.Server


Java NOI非阻塞技术不是开启线程去等待端口的响应,而是采用Reactor模式或Observer模式监听I/O端口,当端口有响应时,会自动通知我们,从而实现流畅的I/O读写。

Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。

Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:

Java NIO参考资料:

Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:

public static Server getServer(final Object instance, final String bindAddress, final int port,
                                final int numHandlers,
                                final boolean verbose, Configuration conf,
                                SecretManager<? extends TokenIdentifier> secretManager)
    throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

Server.Call是一个请求类,类似Client.Call,只是添加了Call的时间戳机制:

private static class Call {
    private int id;                      // 请求id
    private Writable param;              // 请求的参数
    private Connection connection;        // 和Client一样,表示一个C/S间的连接
    private long timestamp;              // 时间戳
    private ByteBuffer response;          // server对此次请求的响应结果
...
}

知道了Client.Connection后,显然Server.Connection就是Server到Client的连接。Server.Connection内保存了Client的地址,用于灾难恢复。Server.Connection通过调用readAndProcess对Client进行一些操作:版本校验,读数据头processHeader(获取通信协议protocol,根据头部的ugi信息创建user对象)以及读数据processData(获取Client发送过来的Call.id和params,根据二者建立一个请求call,并将请求call入队callQueue)【readAndProcess方法是在Listener.doRead时调用,此时监听器监听到新连接的读数据事件】。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 下一页

相关内容