Hadoop源码分析之IPC服务端连接建立与方法调用(续)


在上一篇文章Hadoop源码分析之IPC服务端连接建立与方法调用中分析到了Handler线程的run()方法。下面来看看如果服务器端如何将调用结果发送给客户端。

发送远程方法调用结果

在Server.Handler.run()方法中,首先从阻塞队列callQueue中取出一个方法调用对象(Call对象),然后调用RPC.Server.call()方法利用反射机制进行实际方法的调用。Server.java文件中的Server类是一个抽象类,Server.call()方法是其一个抽象方法,而RPC.Server继承自抽象类Server,并且实现了call()方法,RPC.Server.call()方法部分代码如下:

/**
     * 在服务端进行实际方法调用
     * @param protocol 接口类
     * @param param 调用参数
     * @param receivedTime 接受的时间
     */
    public Writable call(Class<?> protocol, Writable param, long receivedTime)  throws IOException {
        Invocation call = (Invocation)param;

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        //在IPC服务器实现对象instance上调用相应的方法
        Object value = method.invoke(instance, call.getParameters());
        int processingTime = (int) (System.currentTimeMillis() - startTime);
        int qTime = (int) (startTime-receivedTime);//
        if (LOG.isDebugEnabled()) {
          LOG.debug("Served: " + call.getMethodName() +
                    " queueTime= " + qTime +
                    " procesingTime= " + processingTime);
        }
        return new ObjectWritable(method.getReturnType(), value);
      }
call()方法首先获取到要调用的方法,再利用Java反射技术在真实的对象上执行方法调用,调用完成后将调用的结果和结果类型封装成一个ObjectWritable类型,所以call()方法的返回值包含了方法返回值的类型和具体的返回值。使用ObjectWritable类型可以对其对象上的各个属性进行序列化,所以有了这个类就省去了为每个在服务器和客户端之间传递类型建立一个实现了Writable接口的类的需要。call()待方法调用完成后,如果是调用成功,就得到了调用的返回值即ObjectWritable对象,如果是调用失败就记录失败信息,然后调用方法Server.setupResponse()将方法调用的结果序列化准备返回给客户端,Server.setupResponse()方法的代码如下:

/**
   * Setup response for the IPC Call.
   * @param response buffer to serialize the response into
   * @param call {@link Call} to which we are setting up the response
   * @param status {@link Status} of the IPC call
   * @param rv return value for the IPC Call, if the call was successful
   * @param errorClass error class, if the the call failed
   * @param error error message, if the call failed
   * @throws IOException
   */
  private void setupResponse(ByteArrayOutputStream response, 
                             Call call, Status status, 
                             Writable rv, String errorClass, String error)throws IOException {
    response.reset();
    DataOutputStream out = new DataOutputStream(response);
    out.writeInt(call.id);                // write call id
    out.writeInt(status.state);           // write status

    if (status == Status.SUCCESS) {
      rv.write(out);
    } else {
      WritableUtils.writeString(out, errorClass);
      WritableUtils.writeString(out, error);
    }
    if (call.connection.useWrap) {//安全相关
      wrapWithSasl(response, call);
    }
    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
  }

在Server.setupResponse()方法中,先向输出缓存中写入方法调用对象(Call对象)的id和方法调用的状态(org.apache.hadoop.ipc.Status.SUCCESS和ERROR),然后根据方法调用的状态来写出具体的内容,如果方法调用成功,调用返回值对象的(ObjectWritable对象,即rv是一个ObjectWritable对象)的write方法,将这个返回值写出到输出缓存中,如果方法调用失败,则写出错误的类型和具体的错误信息,将数据都写到输出缓存后,就调用Call类的setResponse()方法,这个方法就是让Call类的response变量指向输出缓存,这样Call对象就记录了所有的有关调用的信息。接下来就只需要将Call对象中的数据发送给客户端了。将数据发送给客户端的过程将会在Responder线程中完成。

Responder类继承自Thread类,是一个线程,一个Server对象对应一个Responder线程,所以所有客户端的的IPC调用结果经由Responder返回,但是也有可能是在Handler线程中返回,为什么这样说呢?下面来分析这个过程。

在Handler的run()方法中,将返回给客户端的数据序列化完成之后,会调用这行代码,这行代码的作用就是将调用的对象(Call对象,其实就是使用了Call对象中的response变量)放入一个队列中,Responder.doRespond()方法代码如下:

void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
          processResponse(call.connection.responseQueue, true);
        }
      }
    }
从代码中可以看出,首先是将Call对象加入到一个队列中,这个队列是Connection类的一个LinkedList对象,存储了返回给客户端的数据,暂且称为响应队列吧。然后如果响应队列的大小为1,就调用方法processResponse()进行处理,即将数据发送给客户端,注意,这个将数据发送给客户端的过程是在Handler线程中,而不是在Responder线程中,因为doRespond()方法是在Handler类的run()方法中调用的,也就是说processResponse()方法可以在Responder中调用,也可以在Handler中调用,若该方法在Handler中被调用,则processResponse()方法的第二个参数inHandler为true,此时responseQueue中只有一个等待被发送的远程调用,如果是在Responder线程中调用,那么inHandler参数为false,responseQueue队列中可以有一个也可以有多个等待被发送的远程调用。processResponse()方法的代码如下:

/**
     * 向服务器端发送一个远程调用结果,发送完成后如果没有其他的要发送,则返回true
     * @param responseQueue
     * @param inHandler
     */
    private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      boolean error = true;
      boolean done = false;       // there is more data for this channel.该通道上没有数据要发送
      int numElements = 0;
      Call call = null;
      try {
        synchronized (responseQueue) {
          numElements = responseQueue.size();
          if (numElements == 0) {//如果通道上没有数据要发送,则返回
            error = false;
            return true;              // no more data for this channel.
          }
          call = responseQueue.removeFirst();
          SocketChannel channel = call.connection.channel;
          if (LOG.isDebugEnabled()) {
            LOG.debug(getName() + ": responding to #" + call.id + " from " +
                      call.connection);
          }
          int numBytes = channelWrite(channel, call.response);//在非阻塞模式下尽可能多发送数据
          if (numBytes < 0) {
            return true;
          }
          if (!call.response.hasRemaining()) {//应答数据已经写完
            call.connection.decRpcCount();
            if (numElements == 1) {    // last call fully processes.
              done = true;             // no more data for this channel.
            } else {
              done = false;            // more calls pending to be sent.
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug(getName() + ": responding to #" + call.id + " from " +
                        call.connection + " Wrote " + numBytes + " bytes.");
            }
          } else {
            //应答数据没有写完,插入队列头,等待再次发送,只有在执行一次发送之后,没有发送完数据,再次加入队列后,才会出现响应队列上有多个调用结果的情况
            call.connection.responseQueue.addFirst(call);
            
          //processResponse可以在Responder中调用,也可以在Handler中调用,若该方法在Handler中被调用,则inHandler参数为true
        	//同时responseQueue中只有一个等待被发送的远程调用,如果inHandler为false,则responseQueue可以有一个也可以有多个
            if (inHandler) {
              // set the serve time when the response has to be sent later,是在Handler中调用的
              call.timestamp = System.currentTimeMillis();
              //成员变量pending++,该变量表示现在有多少个线程在进行通道(发送)注册
              incPending();
              try {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
            	//唤醒可能处于select中等待的Responder选择器
                writeSelector.wakeup();//TODO 难道发送数据的过程中也存在着阻塞?
                //将通道注册到该选择器上
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug(getName() + ": responding to #" + call.id + " from " +
                        call.connection + " Wrote partial " + numBytes + 
                        " bytes.");
            }
          }
          error = false;              // everything went off well
        }
      } finally {
        if (error && call != null) {
          LOG.warn(getName()+", call " + call + ": output error");
          done = true;               // error. no more data for this channel.
          closeConnection(call.connection);
        }
      }
      return done;
    }

从上面的代码中可以看出,如果一次发送就将发送给客户端的响应数据发送完成,并且响应队列中没有其他的数据需要发送了,那么processResponse()方法返回true,如果此次发送数据发送完成,但是响应队列中还有其他数据需要发送,那么返回false(在Handler线程中调用processResponse()方法并不会使用该方法的返回值),如果一次数据发送未将数据发送完成,那么就将这个调用对象(Call对象)再次加入到响应队列中等待下次调用。将调用对象再次加入到响应队列后,就唤醒可能处于等待状态的Selector选择器,再将这个选择其注册到当前连接的通道(Channel)上,这些执行成功后就返回true。

上面说过响应队列上的远程调用对象可能有多个,那么在什么情况下可能有多个响应队列呢?再回到doRespond()方法,如果当前连接上的第一个远程调用执行到了Handler线程的run()方法中的这行代码,此时响应队列responseQueue大小为1,那么进入到doRespond()方法后,即使用关键字对进行同步,在代码块执行的过程中,任何代码都不能访问对象,所以在这个过程中响应队列不会大于1。然后进入到processResponse()方法,在这个方法中,如果一次数据发送就将数据发送完成,那么,响应队列大小就会变为0,如果一次发送没有将数据发送完成,那么远程调用对象重新加入到响应队列中,processResponse()方法调用完成之后,响应队列的大小仍为1,所以doRespond()方法调用完成时,响应队列大小为1,此时其他线程就可以再向响应队列中增加远程调用对象了。从这个过程中可以看出,如果响应队列的大小大于1,那么一定是某一次的数据发送未发送完成。

注意到processResponse()方法中也有一个代码块,也是对响应队列responseQueue进行同步,这个应该是针对Responder线程设计的,Responder.run()方法代码如下:

public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);//该Responder线程属于哪个Server对象
      long lastPurgeTime = 0;   // last check for old calls.最后一次发送完数据的时间

      while (running) {
        try {
          waitPending();     // If a channel is being registered, wait.等待登记通道,TODO ?
          writeSelector.select(PURGE_INTERVAL);//等待通道可写
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {
                  doAsyncWrite(key);
              }
            } catch (IOException e) {
              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
            }
          }
          long now = System.currentTimeMillis();
          if (now < lastPurgeTime + PURGE_INTERVAL) {//TODO ???
            continue;//TODO ???
          }
          lastPurgeTime = now;
          //
          // If there were some calls that have not been sent out for a
          // long time, discard them.
          //已经超过了清理时间,可以检查未处理的Call
          LOG.debug("Checking for old call responses.");
          ArrayList<Call> calls;
          
          // get the list of channels from list of keys.
          synchronized (writeSelector.keys()) {
            calls = new ArrayList<Call>(writeSelector.keys().size());
            iter = writeSelector.keys().iterator();
            while (iter.hasNext()) {
              SelectionKey key = iter.next();
              Call call = (Call)key.attachment();
              if (call != null && key.channel() == call.connection.channel) { 
                calls.add(call);
              }
            }
          }
          
          for(Call call : calls) {
            try {
              doPurge(call, now);
            } catch (IOException e) {
              LOG.warn("Error in purging old calls " + e);
            }
          }
        } catch (OutOfMemoryError e) {
          //
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give
          // some thread(s) a chance to finish
          //
          LOG.warn("Out of Memory in server select", e);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          LOG.warn("Exception in Responder " + 
                   StringUtils.stringifyException(e));
        }
      }
      LOG.info("Stopping " + this.getName());
    }
在Responder.run()方法中,首先是调用waitPending()方法等待通道可用,waitPending()方法就是判断Responder.pending变量是否大于0,如果pending大于0,则表示通道不可用,那么线程等待。

waitPending()方法执行返回后,下面是一段标准的Java NIO的代码,就是看当前的选择器上是否有可写的通道,如果有,那么就调用Responder.doAsyncWrite()方法给客户端发送远程调用的结果。Responder.doAsyncWrite()方法调用了processResponse()方法进行数据发送。Responder.doAsyncWrite()方法的代码如下:

private void doAsyncWrite(SelectionKey key) throws IOException {
      Call call = (Call)key.attachment();
      if (call == null) {
        return;
      }
      if (key.channel() != call.connection.channel) {
        throw new IOException("doAsyncWrite: bad channel");
      }

      synchronized(call.connection.responseQueue) {
        if (processResponse(call.connection.responseQueue, false)) {
          try {
            key.interestOps(0);
          } catch (CancelledKeyException e) {
            /* The Listener/reader might have closed the socket.
             * We don't explicitly cancel the key, so not sure if this will
             * ever fire.
             * This warning could be removed.
             */
            LOG.warn("Exception while changing ops : " + e);
          }
        }
      }
    }
在doAsyncWrite()方法中调用processResponse()方法后需要使用processResponse()方法的返回值,如果返回值为true,那么当前通道上的数据发送完成,就消除当前SelectionKey感兴趣的事件。如果为false,那么什么也不做。doAsyncWrite()方法名中有async,不知道是不是表示异步的意思。

Responder.run()方法剩余的部分就是对哪些长时间没有处理的远程调用进行处理,如果一个远程调用结果长时间未处理,则将丢弃这个调用,对这部分代码还不是很明白,所以就不多说了,以后理解了再补上。

总结

Hadoop的IPC源码分析就到这里了,IPC主要分为客户端和服务器端,客户端使用了Java动态代理,所以客户端所有的方法调用都集中在了Invoker.invoke()方法中,服务器端主要通过Listener线程,Reader线程,Handler线程,Responder线程来处理客户端的调用,各个线程分工明确,很好的完成了IPC远程方法调用。

--EOF

Reference

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》


相关内容