dubbo中的请求与编/解码

今天又有前端的小伙伴来问dubbo中序列化/反序列化的代码,想起这块我还没研究过,于是看了看源码,这篇文章用来记录dubbo中的请求和接收。

请求的发送

首先我们看DubboInvoker的doInvoke方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected Result doInvoke(Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation)invocation;
String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment("path", this.getUrl().getPath());
inv.setAttachment("version", this.version);
ExchangeClient currentClient;
if(this.clients.length == 1) {
currentClient = this.clients[0];
} else {
currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
}

try {
boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
if(isOneway) {
boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture((Future)null);
return new RpcResult();
} else if(isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture((Future)null);
return (Result)currentClient.request(inv, timeout).get();
}
} catch (TimeoutException var9) {
throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var9.getMessage(), var9);
} catch (RemotingException var10) {
throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
}
}

这个方法我们之前讲过,所以直接看同步调用:

1
2
RpcContext.getContext().setFuture((Future)null);
return (Result)currentClient.request(inv, timeout).get();

最终会走到HeaderExchangeClient:

1
2
3
public ResponseFuture request(Object request) throws RemotingException {
return this.channel.request(request);
}

HeaderExchangeClient调用了HeaderExchangeChannel的request方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ResponseFuture request(Object request) throws RemotingException {
return this.request(request, this.channel.getUrl().getPositiveParameter("timeout", 1000));
}

public ResponseFuture request(Object request, int timeout) throws RemotingException {
if(this.closed) {
throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
} else {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(this.channel, req, timeout);

try {
this.channel.send(req);
return future;
} catch (RemotingException var6) {
future.cancel();
throw var6;
}
}
}

代码比较简单,构建了一个Request对象,然后通过channel的send方法发送请求,不过这里比较奇怪的是,这个方法是同步的,为什么在send之后直接return了future呢,我们来看看future内部做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Object get() throws RemotingException {
return this.get(this.timeout);
}

public Object get(int timeout) throws RemotingException {
if(timeout <= 0) {
timeout = 1000;
}

if(!this.isDone()) {
long start = System.currentTimeMillis();
this.lock.lock();

try {
while(!this.isDone()) {
this.done.await((long)timeout, TimeUnit.MILLISECONDS);
if(this.isDone() || System.currentTimeMillis() - start > (long)timeout) {
break;
}
}
} catch (InterruptedException var8) {
throw new RuntimeException(var8);
} finally {
this.lock.unlock();
}

if(!this.isDone()) {
throw new TimeoutException(this.sent > 0L, this.channel, this.getTimeoutMessage(false));
}
}

return this.returnFromResponse();
}

可以看到dubbo内部在线程中通信上选用了condition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static class RemotingInvocationTimeoutScan implements Runnable {
private RemotingInvocationTimeoutScan() {
}

public void run() {
while(true) {
try {
Iterator i$ = DefaultFuture.FUTURES.values().iterator();

while(i$.hasNext()) {
DefaultFuture future = (DefaultFuture)i$.next();
if(future != null && !future.isDone() && System.currentTimeMillis() - future.getStartTimestamp() > (long)future.getTimeout()) {
Response timeoutResponse = new Response(future.getId());
timeoutResponse.setStatus((byte)(future.isSent()?31:30));
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}

Thread.sleep(30L);
} catch (Throwable var4) {
DefaultFuture.logger.error("Exception when scan the timeout invocation of remoting.", var4);
}
}
}
}

内部有一个runnable,通过死循环去获取数据,在received中调用condition的signal方法,从而获取数据。

回到上面,真正发送请求是在channel.send中,而这个channel是NettyChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;

try {
ChannelFuture future = this.channel.write(message);
if(sent) {
timeout = this.getUrl().getPositiveParameter("timeout", 1000);
success = future.await((long)timeout);
}

Throwable cause = future.getCause();
if(cause != null) {
throw cause;
}
} catch (Throwable var7) {
throw new RemotingException(this, "Failed to send message " + message + " to " + this.getRemoteAddress() + ", cause: " + var7.getMessage(), var7);
}

if(!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + this.getRemoteAddress() + "in timeout(" + timeout + "ms) limit");
}
}

这里就通过write方法发送了请求。

编码&解码

回忆之前的文章,在创建nettyServer的时候有这么一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, this.getUrl().getPositiveParameter("iothreads", Constants.DEFAULT_IO_THREADS));
this.bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
this.channels = nettyHandler.getChannels();
this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(NettyServer.this.getCodec(), NettyServer.this.getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
this.channel = this.bootstrap.bind(this.getBindAddress());
}

其中通过netty的pipleline机制加了3个扩展:

1
2
3
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);

我们知道在发送请求,也就是下行的过程中,会以encoder->handler的方式调用,而在接受请求,也就是上行的过程中,会以handler->decoder的顺序调用。

1
2
3
4
5
6
7
8
9
10
11
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), this.url, this.handler);

try {
this.handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}

}

nettyHandler中的writeRequest很简单,就是交由真正的handler去操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private class InternalEncoder extends OneToOneEncoder {
private InternalEncoder() {
}

protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, NettyCodecAdapter.this.url, NettyCodecAdapter.this.handler);

try {
NettyCodecAdapter.this.codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}

return org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}

encoder也比较简单,调用了codec去做encode,而这个codec通过SPI的机制去生成,默认是DubboCodec,是ExchangeCodec的子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = this.getSerialization(channel);
byte[] header = new byte[16];
Bytes.short2bytes(-9541, header);
header[2] = (byte)(-128 | serialization.getContentTypeId());
if(req.isTwoWay()) {
header[2] = (byte)(header[2] | 64);
}

if(req.isEvent()) {
header[2] = (byte)(header[2] | 32);
}

Bytes.long2bytes(req.getId(), header, 4);
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + 16);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if(req.isEvent()) {
this.encodeEventData(channel, out, req.getData());
} else {
this.encodeRequestData(channel, out, req.getData());
}

out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, (long)len);
Bytes.int2bytes(len, header, 12);
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header);
buffer.writerIndex(savedWriteIndex + 16 + len);
}

encodeRequest中以dubbo协议去构造数据(调用DubboCodec的encodeRequestData方法),值得注意的是这里默认使用的Serialization是Hessian2Serialization,也就是说使用的是hessian协议。

请求接受和上面的发送差不多,只不过先调用nettyHandler,再调用decoder,大家可以自行查看源码。