《RocketMq》一、网络传输篇
https://www.yqxbc.com win10系统 发布时间:2018-08-08 13:17 来源:一起学编程 浏览:加载中

你是否想知道一个分布式系统的网络传输解决方案,那你可以学习下RocketMQ的网络传输原理,从RocketMQ的Remoting网络处理部分,可以学习到如何进行高效的网络传输,这些思想可以应用到不同的业务中。

 

一、要解决的问题

         其实大部分应用的网络处理都要解决如下图所示的问题:

 

 

 

 

那么就以RocketMQ的源码入手,看看它是如何架构如上的结构的。

 

二、RocketMQ-remoting详解

2.1首先给出其整体的结构图

 

2.2 编码解码

在RocketMQ中,所有的通讯都是使用RemotingCommand这个结构,这个结构的内容如下:

 

private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
// 1, RESPONSE_COMMAND

private static final int RPC_ONEWAY = 1; // 0, RPC
// 1, Oneway

/**
 * Header 部分
 */
private int code; // 用于标示请求类型,参见RequestCode,ResponseCode
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = RequestId.getAndIncrement(); // 每个消息的唯一标志,request和response通过该字段匹配
private int flag = 0;
private String remark;
private HashMap<String, String> extFields; // 传输时使用,CommandCustomHeader转为该结构<key,value>后,再统一转为json传输。因此
CommandCustomHeader只能是String,Int,Long等基础数据结构,不能是复合数据结构

private transient CommandCustomHeader customHeader; // 业务逻辑中使用该结构,传输时,使用 extFields
/** * Body 部分 */private transient byte[] body;

 

 

2.2.1 RemotingCommand转为网络传输数据

在MQ中,所有数据传输都使用该数据结构进行数据传输,当把数据转为网络传输时,会将customHeader转为HashMap的extFields,再转为json串

 

2.2.2 传输格式:

Length

Header length

Header data

Body

 

2.2.3 编码过程(重点函数:makeCustomHeaderToNet)

A. 将业务上的CustomHeader转为extFields;

B. 然后调用RemotingSerializable的encode,将RemotingCommand的Header部分转为byte类型

C. 在按照传输格式,将数据转为最终的header+body结构进行传输

 

2.2.4 解码过程(重点函数:decodeCommandCustomHeader)

A. 首先将获取的byteBuffer,按照传输格式进行解包,得到其headerData和bodyData

B. 将HeaderData部分进行decode,解包为RemotingCommand

C. 业务层调用decodeCommandCustomHeader(m.class)将头部解析为对应的m类

 

 

备注:

transient:当序列化类时,有该属性的变量不进行序列化

 

2.3 通信层处理

Netty在处理通信层的事件时,将其NettyEventExecuter的eventQueue中,再起一个线程,不断地处理存入消息。

2.3.1 Put消息

	在Netty的注册部分,handler在addLast的时候,将NettyConnetManageHandler注册进去;这里面对应了connect、disconnect、close、channelRegistered等等事件,对于这些事件,将会调用NettyEventExecuter.putNettyEvent将消息放入Queue中;
2.3.2 Get处理消息
	 NettyEventExecuter处理线程会不断从queue中读取消息进行处理,调用注册的ChannelEventListener进行处理;
 

2.4 业务层处理

2.4.1 NettyRemotingAbstract:它是作为NettyRemotingServer和NettyRemotingClient的基类,对发送和接收的公共部分进行了处理

A. 数据结构和基础函数

A.1 首先保存了RPC处理器:HashMap<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

A.2 其次保存了所有对外请求ConcurrentHashMap<Integer/* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer,ResponseFuture>(256);

A.3  scanResponseTable:扫描responseTable,将超时的ResponseFuture直接移除

 

B. 发送部分

B.1 invokeSyncImpl: 同步发送,发送时,生成ResponseFuture,放入responseTable中;然后发送后等待设置的timeout(3s)时间,如果对应的ResponseFuture为空,则报错;否则返回RemoteCommand进行业务逻辑处理;

B.2 invokeAsyncImpl:异步发送,发送时,生成ResponseFuture,放入responseTable中;如果超过scanResponseTable的timeout (30s),则报错;否则调用注册的invokeCallback进行回调处理;

B.3 invokeOnewayImpl:单向发送,不将消息写入responseTable中,直接返回;

 

C. 接收消息部分

C.1 processRequestCommand:接收消息,作为Server端,接收的消息是请求,那么调用processTable对应的事件进行处理

C.2 processResponseCommand:接收消息,作为Client端,接收的消息是回复,即接收到Server端的回复,那么从responseTable中,首先获取opaque对应的ResponseFuture,如果这个response是异步回调,则有InvokeCallback,那么调用invokeBack函数,然后将Response塞入ResponseFuture后返回;

 

 

 

2.4.2 NettyRemotingServer

处理过程如下:

首先所有的入口都在start函数:

如果是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyServerHandler

NettyDecoder(底层编码):会将数据包从byte转为RemotingCommand

NettyConnectManageHandler(通信层事件):会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理

NettyServerHandler(业务层事件):调用注册的<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable,进行业务逻辑处理,当processRequestCommand接收到消息时,进行对应的处理

 

2.4.3 NettyRemotingClient

首先所有的入口都在start函数:

如果是input方向,那么会先调用NettyDecoder->NettyConnectManageHandler->NettyClientHandler

NettyDecoder:会将数据包从byte转为RemotingCommand

NettyConnectManageHandler:会将请求转入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,对应的调用NettyRemotingAbstract.putNettyEvent将事件放入Queue中,等待NettyEventExecuter进行处理

NettyClientHandler:调用注册的<Integer/* request code */,Pair<NettyRequestProcessor, ExecutorService>> processorTable, 进行业务逻辑处理

ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>():一个remotingClient会管理很多个channel

最后,是client端的超时时间处理,如果连接超过120s没有接收到发送和请求,那么将会断开连接,否则将会是长连接的一个保持。

一个实例:在producer,consumer的连接保持中,虽然有120s的超时时间,但是他们基本都是长连接的一个保持,因为会通过心跳来保持所有的连接。

2.5 后台服务:

1. NettyEventExecutor和ChannelEventListener:主要负责处理connect,disconnect,close等消息。

2. scanResonseTable : 主要负责清理过期超时的response。

3. 异步回调:不算是标准的后台服务,当采用async的发送方式或sync的回调模式时,会在后台线程中执行。

 

三. 一些总结与想法

在整个网络传输部分,有如下值得思考借鉴的地方:

3.1 编解码:编码要节省资源,常常使用bit,位为单位进行编码,最终转为json或xml传输(当然还可以选择probuf等)

3.2 限流的使用,这里采用semaphore来进行限流处理

3.3 rpchook的设计,发送前,接受后的hook设计

3.4 发送与接收

发送:invokeAsyncImpl, invokeOnewayImpl, invokeSyncImpl

接收:processReceiveMessage, processRequestMessage, processResponseMessage

发送与接收使用opaque和responseFuture进行交互(即ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable),其中,opaque用于标示发送/接收对,responseFuture的countDownLatch字段用于通知客户端接收到消息,并控制超时时间。

3.5 后台服务的设计

有很多服务不需要是实时的,需要在一致性和可用性之间找到一个平衡,因此,很多非实时任务可以采用一个全局的单线程来维护,参考上面2.5的描述。