Hadoop RPC(续),hadooprpc


接着上一篇来看server端

在看之前,我们想象一下,服务器端肯定要启动服务,在端口上监听,读取客户端的连接请求和请求数据并处理,最后返回。依次涉及的类:Listener,Connection,Call,Handler,Responser;这些类都是Server的内部类。HDFS分析篇我们知道NameNode,DataNode中有RPC服务的启动,我们从NameNode的main方法开始看,能发现rpcserver是在createNameNode中创建的。走一个,进入createNameNode,最后一行是return new NameNode(conf);继续走,在namenode的构造方法中,它又进入了重载方法。在重载方法中,我们点击进入initialize方法,在initialize方法中有 rpcServer = createRpcServer(conf);这一行代码。确实够曲折的,就这样一直走,你最终会跟到NameNodeRpcServer方法,这个方法挺长的。里面有如下这一行:

 this.clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService).setBindAddress(bindHost)
        .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
点击最后一个build,进去之后再点击最后一个getServer方法。跟了这么久,你就给我看下面这个???

RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                       int port, int numHandlers, int numReaders,
                       int queueSizePerHandler, boolean verbose,
                       Configuration conf, 
                       SecretManager<? extends TokenIdentifier> secretManager,
                       String portRangeConfig
                       ) throws IOException;
说多了全是泪,该方法是在RpcEngine中,选一个WritableRpcEngine实现类看它对getServer的覆写情况:

 public RPC.Server getServer(Class<?> protocolClass,
                      Object protocolImpl, String bindAddress, int port,
                      int numHandlers, int numReaders, int queueSizePerHandler,
                      boolean verbose, Configuration conf,
                      SecretManager<? extends TokenIdentifier> secretManager,
                      String portRangeConfig) 
    throws IOException {
    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }
点击进入new Server(...),就这样一直点,最后又把你带到Server类的构造方法中,构造方法的部分代码如下:

listener = new Listener(); // 服务器端的监听
    this.port = listener.getAddress().getPort();    
    connectionManager = new ConnectionManager();
    this.rpcMetrics = RpcMetrics.create(this, conf);
    this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
    this.tcpNoDelay = conf.getBoolean(
        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);

    // Create the responder here
    responder = new Responder(); // 服务器端的响应
先进入Listener:

    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port); // java的套接字相关
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open(); // 前面我们分析过java的NIO,知道这是打开一个通道
      acceptChannel.configureBlocking(false); // 设置为非阻塞

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);// 绑定到相应主机与端口
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open(); // java NIO的选择器
      readers = new Reader[readThreads]; // 对于读事件,hadoop启动多个线程
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start(); // 读线程的启动
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT); // 将通道注册到选择器上,并绑定连接事件,当有客户端发起连接请求时,selector会返回,否则一直阻塞
        
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
在上面方法中,服务器端接收客户端的连接,并启动多线程读取来自多个客户端发送的数据

在进入Listener方法的下面一个方法responder new Responder():

 Responder() throws IOException {
      this.setName("IPC Server Responder");
      this.setDaemon(true); // 标注为守护线程
      writeSelector = Selector.open(); // create a selector 创建选择器
      pending = 0;
    }
Responder继承了Thread,看看他的run方法:

 public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      try {
        doRunLoop(); // 点进去
      } finally {
        LOG.info("Stopping " + Thread.currentThread().getName());
        try {
          writeSelector.close();
        } catch (IOException ioe) {
          LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
        }
      }
    }
doRunLoop方法:

    private void doRunLoop() {
      long lastPurgeTime = 0;   // last check for old calls.

      while (running) { // 如果在运行中
        try {
          waitPending();     // If a channel is being registered, wait.// 通道注册需要阻塞
          writeSelector.select(PURGE_INTERVAL);
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); // 得到选择器的迭代器
          while (iter.hasNext()) { // 遍历之,看看有没有相应的事件
            SelectionKey key = iter.next();
            iter.remove(); // 处理了就要把相应的事件移除队列
            try {
              if (key.isValid() && key.isWritable()) {
                  doAsyncWrite(key); // 同步写看见木有,点,哪里不会点哪里
              }
            } catch (IOException e) {
              LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
            }
          }
          long now = Time.now();
          if (now < lastPurgeTime + PURGE_INTERVAL) {
            continue;
          }
          lastPurgeTime = now;
          //
          // If there were some calls that have not been sent out for a
          // long time, discard them.
          //
          if(LOG.isDebugEnabled()) {
            LOG.debug("Checking for old call responses.");
          }
          ArrayList<Call> calls;
          
          // get the list of channels from list of keys.
          synchronized (writeSelector.keys()) {
            calls = new ArrayList<Call>(writeSelector.keys().size());
            iter = writeSelector.keys().iterator();
            while (iter.hasNext()) {
              SelectionKey key = iter.next();
              Call call = (Call)key.attachment();
              if (call != null && key.channel() == call.connection.channel) { 
                calls.add(call);
              }
            }
          }
          
          for(Call call : calls) {
            doPurge(call, now); 
          }
        } catch (OutOfMemoryError e) {
          //
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give
          // some thread(s) a chance to finish
          //
          LOG.warn("Out of Memory in server select", e);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          LOG.warn("Exception in Responder", e);
        }
      }
    }
doAsyncWrite(SelectionKey key) 方法

 private void doAsyncWrite(SelectionKey key) throws IOException {
      Call call = (Call)key.attachment();
      if (call == null) {
        return;
      }
      if (key.channel() != call.connection.channel) {
        throw new IOException("doAsyncWrite: bad channel");
      }

      synchronized(call.connection.responseQueue) { 
        if (processResponse(call.connection.responseQueue, false)) { // 处理响应,英文还不算差,呵呵,进去看看
          try {
            key.interestOps(0);
          } catch (CancelledKeyException e) {
            /* The Listener/reader might have closed the socket.
             * We don't explicitly cancel the key, so not sure if this will
             * ever fire.
             * This warning could be removed.
             */
            LOG.warn("Exception while changing ops : " + e);
          }
        }
      }
    }
processResponse方法如下:

 private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      boolean error = true;
      boolean done = false;       // there is more data for this channel.
      int numElements = 0;
      Call call = null;
      try {
        synchronized (responseQueue) { // 上锁
          //
          // If there are no items for this channel, then we are done
          //
          numElements = responseQueue.size();
          if (numElements == 0) {
            error = false;
            return true;              // no more data for this channel.
          }
          //
          // Extract the first call
          //
          call = responseQueue.removeFirst(); // 这是先进先出的队列吗,没怎么去看,姑且这么认为吧
          SocketChannel channel = call.connection.channel;
          if (LOG.isDebugEnabled()) {
            LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
          }
          //
          // Send as much data as we can in the non-blocking fashion
          // 在非阻塞情况下尽可能多的发送数据
          //
          int numBytes = channelWrite(channel, call.rpcResponse); // 向通道写服务器端返回的数据,具体就不进去看了,还有好多层,这里写了之后并返回写的字节长度

          if (numBytes < 0) { // 如果长度小于0,则表示数据已经写完
            return true;
          }
          if (!call.rpcResponse.hasRemaining()) { // 如果都写完了,就把缓冲区回收了
            //Clear out the response buffer so it can be collected
            call.rpcResponse = null;
            call.connection.decRpcCount();
到这里也就差不多了,服务器端的写结束了

说句实话,hadoop是世界众多顶尖工程师开发的,n次升级后,真的是很复杂,需要慢慢看。里面很多业务细节,我也没怎么看,毕竟好几十万行代码吧。


    

相关内容