Hadoop源码分析之IPC服务端连接建立与方法调用


在前面两篇文章中分析了IPC中Server端的初始化与启动和 客户端IPC连接与方法调用下面来分析服务器当客户端进行方法调用时,将数据发送给服务器后,服务器的处理过程。服务器处理过程涉及的了多个线程与类,正式服务器端的各个部分的合作,才使得IPC机制简单且高效。

连接建立

在org.apache.hadoop.ipc.Client.Connection.setupConnection()方法中的NetUtils.connect(this.socket, server, 20000);这行代码执行之后,客户端就向服务器端发送了一个连接请求。客户端的连接请求统一在在服务器端的org.apache.hadoop.ipc.Server.Listener类中的run()方法中进行处理,即在Listener线程中接收客户端的连接请求。服务器端使用Java NIO技术来接收IPC客户端的连接请求和处理方法调用相关过程,关于Java NIO请参见Java NIO。Server.Listener类中的run()方法代码如下:

@Override
    public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

在run()方法中首先使用一个ThreadLocal遍历将当前的Listener线程与启动Listener线程的Server对象关联,然后进入到一个循环,循环结束的条件是启动Listener线程的服务器停止运行,即running变量为false值,否则Listener线程一直处于运行状态接收客户端的连接请求。循环内部是标准的Java NIO中SocketServer的代码,遍历连接请求,然后逐个处理,如果连接请求有效,就使用方法doAccept()方法处理。doAccept()方法接收一个SelectionKey对象作为参数,在Java NIO中SelectionKey保存了Socket读写相关的信息,如doAccept()方法中通过ServerSocketChannel server = (ServerSocketChannel) key.channel();获取到数据通道ServerSocketChannel对象server,然后通过server.accept()方法就接收了客户端的一个连接请求,与客户端建立了一个连接。连接建立成功之后就配置连接通道信息如设置为非阻塞,设置不使用Nagle算法等。然后就获取一个Reader线程,Reader对象是在Listener创建过程中创建好的,放在一个由数组构成的循环队列中,通过getReader方法获取到。对于Reader类的解释,很多博文上都说一个客户端对应一个Reader线程,但是我觉得不是这样的,因为一个Reader的个数是由ipc.server.read.threadpool.size这个属性指定的,其默认值是1,而客户端的在同一时间访问同一个IPC服务器的数量可能不是一定的,不可能事先就确定好IPC客户端的数量,而Reader中有一个Selector变量readSelector变量,Selector类是Java NIO中的一个选择器类,它可以检测多个通道(Channel),所以没有必要一个客户端对应一个Reader线程。在IPC机制中,Reader线程的数量是可以配置的,觉得这个变量可以配置的原因是通过配置Reader线程的数量而提高性能。不知道这样理解有没有错误,如果有错误欢迎同学们指出。谢谢。doAccept()方法的代码如下:

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }

      }
    }

doAccept()方法中通过getReader()方法获取到一个Reader对象之后,就调用Reader.startAdd()方法,在startAdd()方法中需要调用readSelector.wakeup();来唤醒当前的Reader线程,这是因为在Listener的构造方法中启动了Reader线程,Reader线程启动后就执行了Listener.Reader.run()方法,这样执行到Listener.Reader.run()方法中的这行代码时如果没有客户端的请求就阻塞了,在Reader.startAdd()方法中通过将该Reader线程唤醒,这样Listener.Reader.run()方法就继续往下执行。还是回到doAccept()方法,在Reader.startAdd()方法调用完成,唤醒Reader线程之后,就继续注册当前接收客户端连接创建的通道到Reader中的Selector对象readSelector中(Reader.registerChannel()方法),注册通道的方法返回一个选择键(SelectionKey)对象,然后就创建服务器端的Connection对象。服务器端的Connection对象与客户端类似,它保存的是服务器端与客户端的连接中的服务器端需要的信息。连接创建成功之后就将选择键与连接相关连(readKey.attach(c)),Connection对象作为选择键的附加对象,用于服务器端的与客户端的通信处理。再将当前的Connection对象加入到连接列表connectionList中,最后调用Reader.finishAdd()方法表示Reader添加完毕,这个Reader就可以顺利执行其他处理了,如果Reader.startAdd()方法调用了,而Reader.finishAdd()还未调用,那么在Listener.Reader.run()方法中会阻塞在一个while()循环中。Listenre.Reader.run()方法的代码如下:

      public void run() {
        LOG.info("Starting SocketReader");
        synchronized (this) {
          while (running) {
            SelectionKey key = null;
            try {
              readSelector.select();
              while (adding) {如果调用了Reader.startAdd()方法,而Reader.finishAdd()还未调用,则会阻塞在这里
                this.wait(1000);
              }              

              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                if (key.isValid()) {
                  if (key.isReadable()) {
                    doRead(key);
                  }
                }
                key = null;
              }
            } catch (InterruptedException e) {
              if (running) {                      // unexpected -- log it
                LOG.info(getName() + " caught: " +
                         StringUtils.stringifyException(e));
              }
            } catch (IOException ex) {
              LOG.error("Error in Reader", ex);
            }
          }
        }
      }

上面的代码中,readSelector.select()方法就是Reader线程启动之后如果没有客户端连接请求就会阻塞的位置,当客户端请求到达后,需要通过Listener.Reader.startAdd()方法唤醒。待Listener.Reader.finishAdd()方法完成之后,adding变量就为false了,那么就可以顺利向下执行了。下面优势一段标准的Java NIO代码,先取道当前Reader线程中的有效的选择键的迭代器,然后迭代处理,选择键(SelectionKey中保存了interest集合,ready集合,Channel,Selector,附加的对象(这是就是在doAccept中创建并添加的Connection对象),Java NIO更相信的信息可以参见Java NIO)得到之后就对有效的可读选择键执行doRead()方法读取客户端发送过来的数据。doRead()方法的代码如下:

void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   

在doRead()方法中首先通过选择键(SelectionKey)获取到键上的附加对像(在这里是Connection对象,在doAccept()方法中添加的)。然后设置Connection对象的lastContact属性,lastContact属性表示当前Connection与客户端最后一次交互的时间,下面就是调用Connection中的Connection.readAndProcess()方法来处理。

在Connection.readAndProcess()方法中,使用dataLengthBuffer字段读取客户端发送的RPC头部的IPC连接魔数和每个数据帧的长度。首先读取客户端发送的RPC头部信息,关于客户端发送给服务器的数据格式,可以参考博文Hadoop源码分析之IPC连接与方法调用。RPC头部信息首先是四个字节的IPC连接魔数,之所以是四个字节,是因为IPC规定客户端发送给服务器端的数据长度是也是四个字节,保存在dataLengthBuffer这个ByteBuffer类型的缓冲区中,dataLengthBuffer变量在初始化时分配了四个字节。然后再读取两个字节的数据,第一个字节是IPC协议版本号(当前版本是4),第二个字节是认证方式,是AuthMethod枚举类型的code变量,占一个字节。读取完RPC头部信息后就对比魔数,版本号,和认证方式是否正确,只有三个信息都正确,才继续向下执行,否则就报错返回。

当RPC头部数据读取完成之后,设置rpcHeaderRead值为true,表示RPC头部的消息已经读取并验证,接下来可以读RPC的数据部分了。紧接着RPC数据部分的是ConnectionHeader(连接头部),读取连接头部时也是用缓存dataLengthBuffer先读取连接头部的长度(四个字节),长度部分读取完成之后再读取数据部分。先根据dataLengthBuffer缓存数据的值确定数据缓存的大小,然后进行读取直到缓存读满。数据缓存读满之后,根据变量headerRead判断连接头是否处理了,如果连接头还没有处理,就调用方法processHeader()来处理连接头,连接头包括调用的接口名和认证信息等。如果已经处理完了连接头信息,就调用方法processData()处理具体的调用。processData()方法和processHeader()方法放在了processOneRpc()方法中调用,processOneRpc()方法每调用完一次就设置data=null,为下次接收做准备,下次再使用data接收数据时重新为data分配缓存大小。

Connection.readAndProcess()的方法代码如下:

/**
     * 读取客户端发送的数据并根据数据类型进行相应的处理
     * @return 返回读取到数据的长度
     */
    public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        /* Read at most one RPC. If the header is not read completely yet
         * then iterate until we read first RPC or until there is no data left.
         */    
        int count = -1;
        //dataLengthBuffer是四个字节,用于读取IPC连接魔数和数据长度
        if (dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, dataLengthBuffer);       
          if (count < 0 || dataLengthBuffer.remaining() > 0) 
            return count;
        }
      
        if (!rpcHeaderRead) {//如果RPC头部还没有读取那么就读取RPC头部
          //Every connection is expected to send the header.
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
          int version = rpcHeaderBuffer.get(0);//第一个字节是接口协议版本
          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//认证方式的code
          authMethod = AuthMethod.read(new DataInputStream(
              new ByteArrayInputStream(method)));
          dataLengthBuffer.flip();          
          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
            //Warning is ok since this is not supposed to happen.
            LOG.warn("Incorrect header or version mismatch from " + 
                     hostAddress + ":" + remotePort +
                     " got version " + version + 
                     " expected version " + CURRENT_VERSION);
            return -1;
          }
          dataLengthBuffer.clear();
          if (authMethod == null) {
            throw new IOException("Unable to read authentication method");
          }
          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
            AccessControlException ae = new AccessControlException("Authorization ("
              + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
              + ") is enabled but authentication ("
              + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
              + ") is configured as simple. Please configure another method "
              + "like kerberos or digest.");
            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
                null, ae.getClass().getName(), ae.getMessage());
            responder.doRespond(authFailedCall);
            throw ae;
          }
          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
            doSaslReply(SaslStatus.SUCCESS, new IntWritable(
                SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
            authMethod = AuthMethod.SIMPLE;
            // client has already sent the initial Sasl message and we
            // should ignore it. Both client and server should fall back
            // to simple auth from now on.
            skipInitialSaslHandshake = true;
          }
          if (authMethod != AuthMethod.SIMPLE) {
            useSasl = true;
          }
          
          rpcHeaderBuffer = null;
          rpcHeaderRead = true;//RPC头部读取完成
          continue;//接下来读取数据部分:连接头部或方法数据
        }
        
        if (data == null) {
          dataLengthBuffer.flip();
          dataLength = dataLengthBuffer.getInt();
       
          if (dataLength == Client.PING_CALL_ID) {
            if(!useWrap) { //covers the !useSasl too
              dataLengthBuffer.clear();
              return 0;  //ping message
            }
          }
          if (dataLength < 0) {
            LOG.warn("Unexpected data length " + dataLength + "!! from " + 
                getHostAddress());
          }
          data = ByteBuffer.allocate(dataLength);
        }
        
        count = channelRead(channel, data);
        
        if (data.remaining() == 0) {//读到一个完整的消息
          dataLengthBuffer.clear();
          data.flip();
          if (skipInitialSaslHandshake) {
            data = null;
            skipInitialSaslHandshake = false;
            continue;
          }
          boolean isHeaderRead = headerRead;
          if (useSasl) {
            saslReadAndProcess(data.array());
          } else {
            processOneRpc(data.array());
          }
          data = null;
          if (!isHeaderRead) {//如果isHeaderRead为false,则刚刚在processOneRpc()方法中处理的是ConnectionHeader,那么就继续处理数据部分
            continue;
          }
        } 
        return count;
      }
    }
有一点需要注意的是Reader.run()方法中使用Java NIO方式处理客户端发送过来的数据,由于网络发送数据的不确定性,不一定每次接收都会接收一个完整的消息,所以在readAndProcess()方法中dataLengthBuffer和data这两个缓存都需要等到读取完成之后,才能进行下一步的处理,否则方法返回刚刚读取到的数据字节数count,继续等待读取下一次到达的数据。dataLengthBuffer缓存读满,表示IPC魔数读取完成或者是数据长度读取完成,只有IPC魔数读取完成,才能判断这个过程是否出错,只有数据长度读取完成,才能判断数据部分的长度,从而确定数据缓存的大小。data缓存读满表示读取到一个完整的消息(连接头部数据或者是一次调用数据),这样才能正确的进行下一步的处理。

在readAndProcess()方法中也处理了客户端发送过来的ping请求,因为客户端在与服务器端连接上以后,需要与服务器保持连接,所以会每隔一定时间发送一个ping请求,这个暂且不分析。

处理从客户端接收的数据主要集中在processOneRpc()方法中,首先根据变量headerReader判断是否读取完了客户端IPC调用的头部信息,如果还没有读取头部信息,那么就调用processHeader()方法进行处理,然后设置headerRead为true,如果已经读取了头部信息headerRead的值就是false,就会调用processData()方法,源代码如下:

/**
     * readAndProcess()方法接收到客户端RPC调用的数据之后,在这个方法里进行处理
     */
    private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
    }

对于客户端发送过来的信息,服务器端如何进行处理呢?Hadoop自建了一种序列化的机制,序列化就是[透过网络传送资料时进行编码的过程,可以是字节或是XML等格式。而字节的或XML编码格式可以还原完全相等的对象。这程序被应用在不同应用程式之间传送对象,以及服务器将对象储存到档案或数据库。相反的过程又称为反序列化。fromhttp://zh.wikipedia.org/wiki/%E5%BA%8F%E5%88%97%E5%8C%96]Hadoop进行序列化就要实现Writable接口,然后实现方法和方法。write()方法就是将该对象序列化,生成该对象对应的字节码数据,这个过程就是序列化,readFields()方法与之相反,将write()方法生成的字节数据反序列化生成一个Java对象,这个过程称为反序列化。processHeader()方法先将ConnectionHeader对象对应的序列化数据反序列化(通过ConnectionHeader.readFields()方法),生成一个ConnectionHeader对象,然后再通过这个对象生成其他的对象和进行权限检查,具体可以查看processHeader()方法。processData()方法首先读取一个int类型数据,即客户端发送的调用(Call)对象的ID,然后再反序列化客户端发送的Writable类型(在这里实际是Invocation类型)的参数,最后生成一个Call类型的对象放入一个阻塞队列callQueue中,等待服务器端进行调用,具体的方法代码如下:

/**
     * 服务器端处理客户端的方法调用,生成一个Call对象,再将Call对象放入一个阻塞队列中
     */
    private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();                    // try to read an id
        
      if (LOG.isDebugEnabled())
        LOG.debug(" got #" + id);

      Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
      param.readFields(dis);        
        
      Call call = new Call(id, param, this);
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

方法调用

在Reader线程中,将调用对象(Call对象)生成并放入阻塞队列callQueue中以后,下面处理方法调用就由Hadnler线程负责了。Handler线程是在Server.start()方法中创建并启动的,代码如下:

/** Starts the service.  Must be called before any calls will be handled. 
   * 该方法调用之后,所有线程才开始工作
   * */
  public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }
在Hadoop IPC中,Handler线程主要用于处理方法Reader线程读取并准备好的方法调用,即Call对象,Handler线程从阻塞队列callQueue中取出要调用的方法,然后进行处理。Handler有多个,其数量在创建Server对象的时候通过构造函数的参数指定,如通过RPC.getServer()方法。Handler线程在其run()方法中重复的读取callQueue这个阻塞队列,每次取出一个Call对象,调用RPC.Server.call()方法进行最终的方法调用,Server.java文件中的Server类是一个抽象类,RPC.Server类是它的一个子类,这个call()方法正是在这个方法中实现的。Handler类的run()方法代码如下:

public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here

          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": has #" + call.id + " from " +
                      call.connection);
          
          String errorClass = null;
          String error = null;
          Writable value = null;

          CurCall.set(call);
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject
            if (call.connection.user == null) {
              value = call(call.connection.protocol, call.param, 
                           call.timestamp);
            } else {
              value = 
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // make the call
                       return call(call.connection.protocol, 
                                   call.param, call.timestamp);

                     }
                   }
                  );
            }
          } catch (Throwable e) {
            String logMsg = getName() + ", call " + call + ": error: " + e;
            if (e instanceof RuntimeException || e instanceof Error) {
              // These exception types indicate something is probably wrong
              // on the server side, as opposed to just a normal exceptional
              // result.
              LOG.warn(logMsg, e);
            } else if (exceptionsHandler.isTerse(e.getClass())) {
              // Don't log the whole stack trace of these exceptions.
              // Way too noisy!
              LOG.info(logMsg);
            } else {
              LOG.info(logMsg, e);
            }
            errorClass = e.getClass().getName();
            error = StringUtils.stringifyException(e);
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            // setupResponse() needs to be sync'ed together with 
            // responder.doResponse() since setupResponse may use
            // SASL to encrypt response data and SASL enforces
            // its own message ordering.
            setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error);
          // Discard the large buf and reset it back to 
          // smaller size to freeup heap
          if (buf.size() > maxRespSize) {
            LOG.warn("Large response size " + buf.size() + " for call " + 
                call.toString());
              buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
            }
            responder.doRespond(call);
          }
        } catch (InterruptedException e) {
          if (running) {                          // unexpected -- log it
            LOG.info(getName() + " caught: " +
                     StringUtils.stringifyException(e));
          }
        } catch (Exception e) {
          LOG.info(getName() + " caught: " +
                   StringUtils.stringifyException(e));
        }
      }
      LOG.info(getName() + ": exiting");
    }
上面的方法代码比较清晰,先从callQueue队列中取出一个Call对象,再调用方法call()调用,最后通过Responder线程将调用的结果发送给客户端。这个过程在下一篇文章中再详细分析


--EOF

Reference

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》

相关内容