Hadoop RPC(续),hadooprpc
Hadoop RPC(续),hadooprpc
接着上一篇来看server端
在看之前,我们想象一下,服务器端肯定要启动服务,在端口上监听,读取客户端的连接请求和请求数据并处理,最后返回。依次涉及的类:Listener,Connection,Call,Handler,Responser;这些类都是Server的内部类。HDFS分析篇我们知道NameNode,DataNode中有RPC服务的启动,我们从NameNode的main方法开始看,能发现rpcserver是在createNameNode中创建的。走一个,进入createNameNode,最后一行是return new NameNode(conf);继续走,在namenode的构造方法中,它又进入了重载方法。在重载方法中,我们点击进入initialize方法,在initialize方法中有 rpcServer = createRpcServer(conf);这一行代码。确实够曲折的,就这样一直走,你最终会跟到NameNodeRpcServer方法,这个方法挺长的。里面有如下这一行:
this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();点击最后一个build,进去之后再点击最后一个getServer方法。跟了这么久,你就给我看下面这个???
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig ) throws IOException;说多了全是泪,该方法是在RpcEngine中,选一个WritableRpcEngine实现类看它对getServer的覆写情况:
public RPC.Server getServer(Class<?> protocolClass, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException { return new Server(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig); }点击进入new Server(...),就这样一直点,最后又把你带到Server类的构造方法中,构造方法的部分代码如下:
listener = new Listener(); // 服务器端的监听 this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.tcpNoDelay = conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT); // Create the responder here responder = new Responder(); // 服务器端的响应先进入Listener:
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // java的套接字相关 // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); // 前面我们分析过java的NIO,知道这是打开一个通道 acceptChannel.configureBlocking(false); // 设置为非阻塞 // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);// 绑定到相应主机与端口 port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); // java NIO的选择器 readers = new Reader[readThreads]; // 对于读事件,hadoop启动多个线程 for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); // 读线程的启动 } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); // 将通道注册到选择器上,并绑定连接事件,当有客户端发起连接请求时,selector会返回,否则一直阻塞 this.setName("IPC Server listener on " + port); this.setDaemon(true); }在上面方法中,服务器端接收客户端的连接,并启动多线程读取来自多个客户端发送的数据
在进入Listener方法的下面一个方法responder new Responder():
Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); // 标注为守护线程 writeSelector = Selector.open(); // create a selector 创建选择器 pending = 0; }Responder继承了Thread,看看他的run方法:
public void run() { LOG.info(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); try { doRunLoop(); // 点进去 } finally { LOG.info("Stopping " + Thread.currentThread().getName()); try { writeSelector.close(); } catch (IOException ioe) { LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe); } } }doRunLoop方法:
private void doRunLoop() { long lastPurgeTime = 0; // last check for old calls. while (running) { // 如果在运行中 try { waitPending(); // If a channel is being registered, wait.// 通道注册需要阻塞 writeSelector.select(PURGE_INTERVAL); Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); // 得到选择器的迭代器 while (iter.hasNext()) { // 遍历之,看看有没有相应的事件 SelectionKey key = iter.next(); iter.remove(); // 处理了就要把相应的事件移除队列 try { if (key.isValid() && key.isWritable()) { doAsyncWrite(key); // 同步写看见木有,点,哪里不会点哪里 } } catch (IOException e) { LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e); } } long now = Time.now(); if (now < lastPurgeTime + PURGE_INTERVAL) { continue; } lastPurgeTime = now; // // If there were some calls that have not been sent out for a // long time, discard them. // if(LOG.isDebugEnabled()) { LOG.debug("Checking for old call responses."); } ArrayList<Call> calls; // get the list of channels from list of keys. synchronized (writeSelector.keys()) { calls = new ArrayList<Call>(writeSelector.keys().size()); iter = writeSelector.keys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); Call call = (Call)key.attachment(); if (call != null && key.channel() == call.connection.channel) { calls.add(call); } } } for(Call call : calls) { doPurge(call, now); } } catch (OutOfMemoryError e) { // // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish // LOG.warn("Out of Memory in server select", e); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { LOG.warn("Exception in Responder", e); } } }doAsyncWrite(SelectionKey key) 方法
private void doAsyncWrite(SelectionKey key) throws IOException { Call call = (Call)key.attachment(); if (call == null) { return; } if (key.channel() != call.connection.channel) { throw new IOException("doAsyncWrite: bad channel"); } synchronized(call.connection.responseQueue) { if (processResponse(call.connection.responseQueue, false)) { // 处理响应,英文还不算差,呵呵,进去看看 try { key.interestOps(0); } catch (CancelledKeyException e) { /* The Listener/reader might have closed the socket. * We don't explicitly cancel the key, so not sure if this will * ever fire. * This warning could be removed. */ LOG.warn("Exception while changing ops : " + e); } } } }processResponse方法如下:
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException { boolean error = true; boolean done = false; // there is more data for this channel. int numElements = 0; Call call = null; try { synchronized (responseQueue) { // 上锁 // // If there are no items for this channel, then we are done // numElements = responseQueue.size(); if (numElements == 0) { error = false; return true; // no more data for this channel. } // // Extract the first call // call = responseQueue.removeFirst(); // 这是先进先出的队列吗,没怎么去看,姑且这么认为吧 SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": responding to " + call); } // // Send as much data as we can in the non-blocking fashion // 在非阻塞情况下尽可能多的发送数据 // int numBytes = channelWrite(channel, call.rpcResponse); // 向通道写服务器端返回的数据,具体就不进去看了,还有好多层,这里写了之后并返回写的字节长度 if (numBytes < 0) { // 如果长度小于0,则表示数据已经写完 return true; } if (!call.rpcResponse.hasRemaining()) { // 如果都写完了,就把缓冲区回收了 //Clear out the response buffer so it can be collected call.rpcResponse = null; call.connection.decRpcCount();到这里也就差不多了,服务器端的写结束了
说句实话,hadoop是世界众多顶尖工程师开发的,n次升级后,真的是很复杂,需要慢慢看。里面很多业务细节,我也没怎么看,毕竟好几十万行代码吧。
评论暂时关闭