Hadoop RPC通信协议


Hadoop RPC客户端(Client)向RPC建立连接时向RPC服务器发送两部分内容:RPC HeaderHeaderRPC Header的格式如下:

private void writeRpcHeader(OutputStream outStream) throws IOException {
			DataOutputStream out = new DataOutputStream(
					new BufferedOutputStream(outStream));
			// Write out the header, version and authentication method
			out.write(Server.HEADER.array());
			out.write(Server.CURRENT_VERSION);
			authMethod.write(out);
			out.flush();
		}

其中Server.HEADERByteBuffer.wrap("hrpc".getBytes())SERVER.CURRENT_VERSION 4,最后为authMethod

RPC Header发送完成后,Client会发送header部分,其具体内容如下:

private void writeHeader() throws IOException {
			// Write out the ConnectionHeader
			DataOutputBuffer buf = new DataOutputBuffer();
			header.write(buf);

			// Write out the payload length
			int bufLen = buf.getLength();
			out.writeInt(bufLen);
			out.write(buf.getData(), 0, bufLen);
		}

Header的主要包含三部分的内容:

class ConnectionHeader implements Writable {
  private String protocol;
  private UserGroupInformation ugi = null;
  private AuthMethod authMethod;
  ......
}

RPC HeaderHeader在连接建立的时候发送并且只发送一次,在此之后,Client会发送每一次方法调用相关的信息:

Call call = new Call(param);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter

下面看一下Call的具体内容:

private class Call {
		int id; // call id
		Writable param; // parameter
		......
}

其中id是本次方法调用的唯一标识,paramInvocation的一个实例,Invocation包含了三部分内容:要调用方法的名字、方法调用参数类型数组、参数数组。

private static class Invocation implements Writable, Configurable {
    private String methodName;
    private Class[] parameterClasses;
  private Object[] parameters;
  ......
}

Call发送的具体代码如下:

d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); // first put the data length
					out.write(data, 0, dataLength);// write the data
					out.flush();

综上所述,RPC协议的具体格式如下:

RPC Header

Header

Call

Call

.....

RPC Header:

‘hrpc’

4

authMethod

Header:

Header长度(Int)

ConnectionHeader

Call

Call长度(INT)

Call id(int)

方法名

参数类型数组

参数数组

下面我们看一下RPC服务器端是如何采用上面的协议进行交互的。按照设想,RPC服务器接受客户端的连接请求后,服务器首先读取RPC Header,再读取Header,最后不断的读取方法调用(Call)。服务器端的Socket读取都是通过Reader内部类最终由Server内部类Connection类来读取。为了顺利读取Server.Connection设置了几个内部变量,如下图所示。这里要特别说明一下dataLengthBuffer这个变量,因为这个变量的名字存在歧义。在第一次读取的时候(读取RPC Header的时候),dataLengthBuffer保存的是‘hrpc’的byte数组;在其他时候(读取Header的时候或者Call的时候)保存的是长度(int)。除此之外,还需要注意的是RPC采用的异步通信的模式,服务器采用如下的方式判断一次读取已经完成——如果读取的结果为-1或者buffer没有读满,则表明本次读取已经完成但是数据还没有完全读取完成,需要继续等待下一次读取。

boolean rpcHeaderRead = false; //判断RPCHeader是否已经读取完成
boolean headerRead = false; //判断Header是否已经读取完成
ByteBuffer data;//Header的数据或者每一次Call的数据
ByteBuffer dataLengthBuffer;//长度,Header的长度、Call的长度或者



相关内容

    暂无相关文章