Hadoop源码分析之IPC服务端连接建立与方法调用
Hadoop源码分析之IPC服务端连接建立与方法调用
在前面两篇文章中分析了IPC中Server端的初始化与启动和 客户端IPC连接与方法调用下面来分析服务器当客户端进行方法调用时,将数据发送给服务器后,服务器的处理过程。服务器处理过程涉及的了多个线程与类,正式服务器端的各个部分的合作,才使得IPC机制简单且高效。
连接建立
在org.apache.hadoop.ipc.Client.Connection.setupConnection()方法中的NetUtils.connect(this.socket, server, 20000);这行代码执行之后,客户端就向服务器端发送了一个连接请求。客户端的连接请求统一在在服务器端的org.apache.hadoop.ipc.Server.Listener类中的run()方法中进行处理,即在Listener线程中接收客户端的连接请求。服务器端使用Java NIO技术来接收IPC客户端的连接请求和处理方法调用相关过程,关于Java NIO请参见Java NIO。Server.Listener类中的run()方法代码如下:
@Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } }
在run()方法中首先使用一个ThreadLocal遍历将当前的Listener线程与启动Listener线程的Server对象关联,然后进入到一个循环,循环结束的条件是启动Listener线程的服务器停止运行,即running变量为false值,否则Listener线程一直处于运行状态接收客户端的连接请求。循环内部是标准的Java NIO中SocketServer的代码,遍历连接请求,然后逐个处理,如果连接请求有效,就使用方法doAccept()方法处理。doAccept()方法接收一个SelectionKey对象作为参数,在Java NIO中SelectionKey保存了Socket读写相关的信息,如doAccept()方法中通过ServerSocketChannel server = (ServerSocketChannel) key.channel();获取到数据通道ServerSocketChannel对象server,然后通过server.accept()方法就接收了客户端的一个连接请求,与客户端建立了一个连接。连接建立成功之后就配置连接通道信息如设置为非阻塞,设置不使用Nagle算法等。然后就获取一个Reader线程,Reader对象是在Listener创建过程中创建好的,放在一个由数组构成的循环队列中,通过getReader方法获取到。对于Reader类的解释,很多博文上都说一个客户端对应一个Reader线程,但是我觉得不是这样的,因为一个Reader的个数是由ipc.server.read.threadpool.size这个属性指定的,其默认值是1,而客户端的在同一时间访问同一个IPC服务器的数量可能不是一定的,不可能事先就确定好IPC客户端的数量,而Reader中有一个Selector变量readSelector变量,Selector类是Java NIO中的一个选择器类,它可以检测多个通道(Channel),所以没有必要一个客户端对应一个Reader线程。在IPC机制中,Reader线程的数量是可以配置的,觉得这个变量可以配置的原因是通过配置Reader线程的数量而提高性能。不知道这样理解有没有错误,如果有错误欢迎同学们指出。谢谢。doAccept()方法的代码如下:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } }
doAccept()方法中通过getReader()方法获取到一个Reader对象之后,就调用Reader.startAdd()方法,在startAdd()方法中需要调用readSelector.wakeup();来唤醒当前的Reader线程,这是因为在Listener的构造方法中启动了Reader线程,Reader线程启动后就执行了Listener.Reader.run()方法,这样执行到Listener.Reader.run()方法中的这行代码时如果没有客户端的请求就阻塞了,在Reader.startAdd()方法中通过将该Reader线程唤醒,这样Listener.Reader.run()方法就继续往下执行。还是回到doAccept()方法,在Reader.startAdd()方法调用完成,唤醒Reader线程之后,就继续注册当前接收客户端连接创建的通道到Reader中的Selector对象readSelector中(Reader.registerChannel()方法),注册通道的方法返回一个选择键(SelectionKey)对象,然后就创建服务器端的Connection对象。服务器端的Connection对象与客户端类似,它保存的是服务器端与客户端的连接中的服务器端需要的信息。连接创建成功之后就将选择键与连接相关连(readKey.attach(c)),Connection对象作为选择键的附加对象,用于服务器端的与客户端的通信处理。再将当前的Connection对象加入到连接列表connectionList中,最后调用Reader.finishAdd()方法表示Reader添加完毕,这个Reader就可以顺利执行其他处理了,如果Reader.startAdd()方法调用了,而Reader.finishAdd()还未调用,那么在Listener.Reader.run()方法中会阻塞在一个while()循环中。Listenre.Reader.run()方法的代码如下:
public void run() { LOG.info("Starting SocketReader"); synchronized (this) { while (running) { SelectionKey key = null; try { readSelector.select(); while (adding) {如果调用了Reader.startAdd()方法,而Reader.finishAdd()还未调用,则会阻塞在这里 this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } }
上面的代码中,readSelector.select()方法就是Reader线程启动之后如果没有客户端连接请求就会阻塞的位置,当客户端请求到达后,需要通过Listener.Reader.startAdd()方法唤醒。待Listener.Reader.finishAdd()方法完成之后,adding变量就为false了,那么就可以顺利向下执行了。下面优势一段标准的Java NIO代码,先取道当前Reader线程中的有效的选择键的迭代器,然后迭代处理,选择键(SelectionKey中保存了interest集合,ready集合,Channel,Selector,附加的对象(这是就是在doAccept中创建并添加的Connection对象),Java NIO更相信的信息可以参见Java NIO)得到之后就对有效的可读选择键执行doRead()方法读取客户端发送过来的数据。doRead()方法的代码如下:
void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } }
在doRead()方法中首先通过选择键(SelectionKey)获取到键上的附加对像(在这里是Connection对象,在doAccept()方法中添加的)。然后设置Connection对象的lastContact属性,lastContact属性表示当前Connection与客户端最后一次交互的时间,下面就是调用Connection中的Connection.readAndProcess()方法来处理。
在Connection.readAndProcess()方法中,使用dataLengthBuffer字段读取客户端发送的RPC头部的IPC连接魔数和每个数据帧的长度。首先读取客户端发送的RPC头部信息,关于客户端发送给服务器的数据格式,可以参考博文Hadoop源码分析之IPC连接与方法调用。RPC头部信息首先是四个字节的IPC连接魔数,之所以是四个字节,是因为IPC规定客户端发送给服务器端的数据长度是也是四个字节,保存在dataLengthBuffer这个ByteBuffer类型的缓冲区中,dataLengthBuffer变量在初始化时分配了四个字节。然后再读取两个字节的数据,第一个字节是IPC协议版本号(当前版本是4),第二个字节是认证方式,是AuthMethod枚举类型的code变量,占一个字节。读取完RPC头部信息后就对比魔数,版本号,和认证方式是否正确,只有三个信息都正确,才继续向下执行,否则就报错返回。
当RPC头部数据读取完成之后,设置rpcHeaderRead值为true,表示RPC头部的消息已经读取并验证,接下来可以读RPC的数据部分了。紧接着RPC数据部分的是ConnectionHeader(连接头部),读取连接头部时也是用缓存dataLengthBuffer先读取连接头部的长度(四个字节),长度部分读取完成之后再读取数据部分。先根据dataLengthBuffer缓存数据的值确定数据缓存的大小,然后进行读取直到缓存读满。数据缓存读满之后,根据变量headerRead判断连接头是否处理了,如果连接头还没有处理,就调用方法processHeader()来处理连接头,连接头包括调用的接口名和认证信息等。如果已经处理完了连接头信息,就调用方法processData()处理具体的调用。processData()方法和processHeader()方法放在了processOneRpc()方法中调用,processOneRpc()方法每调用完一次就设置data=null,为下次接收做准备,下次再使用data接收数据时重新为data分配缓存大小。
Connection.readAndProcess()的方法代码如下:
/** * 读取客户端发送的数据并根据数据类型进行相应的处理 * @return 返回读取到数据的长度 */ public int readAndProcess() throws IOException, InterruptedException { while (true) { /* Read at most one RPC. If the header is not read completely yet * then iterate until we read first RPC or until there is no data left. */ int count = -1; //dataLengthBuffer是四个字节,用于读取IPC连接魔数和数据长度 if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } if (!rpcHeaderRead) {//如果RPC头部还没有读取那么就读取RPC头部 //Every connection is expected to send the header. if (rpcHeaderBuffer == null) { rpcHeaderBuffer = ByteBuffer.allocate(2); } count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0) { return count; } int version = rpcHeaderBuffer.get(0);//第一个字节是接口协议版本 byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//认证方式的code authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method))); dataLengthBuffer.flip(); if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); return -1; } dataLengthBuffer.clear(); if (authMethod == null) { throw new IOException("Unable to read authentication method"); } if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { AccessControlException ae = new AccessControlException("Authorization (" + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ") is enabled but authentication (" + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ") is configured as simple. Please configure another method " + "like kerberos or digest."); setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); throw ae; } if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { doSaslReply(SaslStatus.SUCCESS, new IntWritable( SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null); authMethod = AuthMethod.SIMPLE; // client has already sent the initial Sasl message and we // should ignore it. Both client and server should fall back // to simple auth from now on. skipInitialSaslHandshake = true; } if (authMethod != AuthMethod.SIMPLE) { useSasl = true; } rpcHeaderBuffer = null; rpcHeaderRead = true;//RPC头部读取完成 continue;//接下来读取数据部分:连接头部或方法数据 } if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); if (dataLength == Client.PING_CALL_ID) { if(!useWrap) { //covers the !useSasl too dataLengthBuffer.clear(); return 0; //ping message } } if (dataLength < 0) { LOG.warn("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } data = ByteBuffer.allocate(dataLength); } count = channelRead(channel, data); if (data.remaining() == 0) {//读到一个完整的消息 dataLengthBuffer.clear(); data.flip(); if (skipInitialSaslHandshake) { data = null; skipInitialSaslHandshake = false; continue; } boolean isHeaderRead = headerRead; if (useSasl) { saslReadAndProcess(data.array()); } else { processOneRpc(data.array()); } data = null; if (!isHeaderRead) {//如果isHeaderRead为false,则刚刚在processOneRpc()方法中处理的是ConnectionHeader,那么就继续处理数据部分 continue; } } return count; } }有一点需要注意的是Reader.run()方法中使用Java NIO方式处理客户端发送过来的数据,由于网络发送数据的不确定性,不一定每次接收都会接收一个完整的消息,所以在readAndProcess()方法中dataLengthBuffer和data这两个缓存都需要等到读取完成之后,才能进行下一步的处理,否则方法返回刚刚读取到的数据字节数count,继续等待读取下一次到达的数据。dataLengthBuffer缓存读满,表示IPC魔数读取完成或者是数据长度读取完成,只有IPC魔数读取完成,才能判断这个过程是否出错,只有数据长度读取完成,才能判断数据部分的长度,从而确定数据缓存的大小。data缓存读满表示读取到一个完整的消息(连接头部数据或者是一次调用数据),这样才能正确的进行下一步的处理。
在readAndProcess()方法中也处理了客户端发送过来的ping请求,因为客户端在与服务器端连接上以后,需要与服务器保持连接,所以会每隔一定时间发送一个ping请求,这个暂且不分析。
处理从客户端接收的数据主要集中在processOneRpc()方法中,首先根据变量headerReader判断是否读取完了客户端IPC调用的头部信息,如果还没有读取头部信息,那么就调用processHeader()方法进行处理,然后设置headerRead为true,如果已经读取了头部信息headerRead的值就是false,就会调用processData()方法,源代码如下:
/** * readAndProcess()方法接收到客户端RPC调用的数据之后,在这个方法里进行处理 */ private void processOneRpc(byte[] buf) throws IOException, InterruptedException { if (headerRead) { processData(buf); } else { processHeader(buf); headerRead = true; if (!authorizeConnection()) { throw new AccessControlException("Connection from " + this + " for protocol " + header.getProtocol() + " is unauthorized for user " + user); } } }
对于客户端发送过来的信息,服务器端如何进行处理呢?Hadoop自建了一种序列化的机制,序列化就是[透过网络传送资料时进行编码的过程,可以是字节或是XML等格式。而字节的或XML编码格式可以还原完全相等的对象。这程序被应用在不同应用程式之间传送对象,以及服务器将对象储存到档案或数据库。相反的过程又称为反序列化。fromhttp://zh.wikipedia.org/wiki/%E5%BA%8F%E5%88%97%E5%8C%96]Hadoop进行序列化就要实现Writable接口,然后实现方法和方法。write()方法就是将该对象序列化,生成该对象对应的字节码数据,这个过程就是序列化,readFields()方法与之相反,将write()方法生成的字节数据反序列化生成一个Java对象,这个过程称为反序列化。processHeader()方法先将ConnectionHeader对象对应的序列化数据反序列化(通过ConnectionHeader.readFields()方法),生成一个ConnectionHeader对象,然后再通过这个对象生成其他的对象和进行权限检查,具体可以查看processHeader()方法。processData()方法首先读取一个int类型数据,即客户端发送的调用(Call)对象的ID,然后再反序列化客户端发送的Writable类型(在这里实际是Invocation类型)的参数,最后生成一个Call类型的对象放入一个阻塞队列callQueue中,等待服务器端进行调用,具体的方法代码如下:
/** * 服务器端处理客户端的方法调用,生成一个Call对象,再将Call对象放入一个阻塞队列中 */ private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(" got #" + id); Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param param.readFields(dis); Call call = new Call(id, param, this); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count }
方法调用
在Reader线程中,将调用对象(Call对象)生成并放入阻塞队列callQueue中以后,下面处理方法调用就由Hadnler线程负责了。Handler线程是在Server.start()方法中创建并启动的,代码如下:
/** Starts the service. Must be called before any calls will be handled. * 该方法调用之后,所有线程才开始工作 * */ public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }在Hadoop IPC中,Handler线程主要用于处理方法Reader线程读取并准备好的方法调用,即Call对象,Handler线程从阻塞队列callQueue中取出要调用的方法,然后进行处理。Handler有多个,其数量在创建Server对象的时候通过构造函数的参数指定,如通过RPC.getServer()方法。Handler线程在其run()方法中重复的读取callQueue这个阻塞队列,每次取出一个Call对象,调用RPC.Server.call()方法进行最终的方法调用,Server.java文件中的Server类是一个抽象类,RPC.Server类是它的一个子类,这个call()方法正是在这个方法中实现的。Handler类的run()方法代码如下:
public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { try { final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection); String errorClass = null; String error = null; Writable value = null; CurCall.set(call); try { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { value = call(call.connection.protocol, call.param, call.timestamp); } else { value = call.connection.user.doAs (new PrivilegedExceptionAction<Writable>() { @Override public Writable run() throws Exception { // make the call return call(call.connection.protocol, call.param, call.timestamp); } } ); } } catch (Throwable e) { String logMsg = getName() + ", call " + call + ": error: " + e; if (e instanceof RuntimeException || e instanceof Error) { // These exception types indicate something is probably wrong // on the server side, as opposed to just a normal exceptional // result. LOG.warn(logMsg, e); } else if (exceptionsHandler.isTerse(e.getClass())) { // Don't log the whole stack trace of these exceptions. // Way too noisy! LOG.info(logMsg); } else { LOG.info(logMsg, e); } errorClass = e.getClass().getName(); error = StringUtils.stringifyException(e); } CurCall.set(null); synchronized (call.connection.responseQueue) { // setupResponse() needs to be sync'ed together with // responder.doResponse() since setupResponse may use // SASL to encrypt response data and SASL enforces // its own message ordering. setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); // Discard the large buf and reset it back to // smaller size to freeup heap if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); } responder.doRespond(call); } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (Exception e) { LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } LOG.info(getName() + ": exiting"); }上面的方法代码比较清晰,先从callQueue队列中取出一个Call对象,再调用方法call()调用,最后通过Responder线程将调用的结果发送给客户端。这个过程在下一篇文章中再详细分析
--EOF
Reference
《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》
评论暂时关闭