《RocketMq》三、NameServer
https://www.yqxbc.com win10系统 发布时间:2018-08-08 13:19 来源:一起学编程 浏览:加载中

RocketMQ-nameSrv用于管理所有broker的信息,以便于Producer和Consumer能够获取到正确的Broker信息,进行业务处理;

可以看到NameSrv的主要管理内容如下:

1. 接收Broker的注册,注销请求;

2. Producer获取topic下的所有BrokerQueue,put消息

3. Consumer获取topic下所有的BrokerQueue,get消息

所以可以看到NameSrv主要是维护了Broker相关的内容,进行存取;

 

问题:

1. 如果readQueue和writeQueue数目不一样怎么办?比如readQueue<writeQueue,这不就意味着有的writeQueue没有办法被均分到,读到数据?

 

2. haServer,masterServer,slave的区别?

haServer:master用来监听slave复制数据的端口

masterServer:主Broker

slave:从Broker,会不断从master复制数据

 

3. master和slave的brokerId?

master是0

 

总结:从NameSrv可以看出,rocketmq与kafka的最大区别就是不再采用zookeeper,因为rq的broker的matser和slave是物理概念,而不是选举出来的master。同时,我们在设计系统的时候,有一些全局使用的公用信息,可以单独独立一个模块进行专门的管理;只需要各个子模块定时向该模块更新信息即可,这样既可以有一个全局概览,也使模块轻量级,不会有太大压力

 

 

一、总体结构

 

二、数据结构

2.1 QueueData:保存了某个broker的read,write的Queue的数目,Consumer和Producer会分别往read/write里面取存消息

private String brokerName;//broker名字
private int readQueueNums;//读queue数目
private int writeQueueNums;//写queue数目
private int perm;
private int topicSynFlag;


2.2 BrokerData:保存了Broker的Id和地址信息

 

private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

 

 

2.3 BrokerLiveInfo:保存了Broker的心跳信息

 

private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;//master的复制server端口


2.4 

 

 

//保存topic-queue信息
HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//保存broker地址信息
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;////brokerAddrTable(brokerName, brokerData); brokerData(brokerId, brokerAddr);
//保存broker-cluster信息
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//保存心跳信息
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable

 

 

三、心跳处理

 

         主要有3个部分:

BrokerHousekeepingService:这个接口会监听网络层的请求,如果有close等关闭请求,直接从brokerLiveTable中移除该数据
 
routeInfoManager.scanNotActiveBroker:遍历所有的BrokerLiveInfo节点,如果上次访问时间距离现在超过2mins,那么就超时移除连接;Broker会定时向NameSrv发送registerBroker,每次注册都会更新BrokeLiveInfo,相当于心跳
 
registerBrokerWithFilterServer:broker会不定时的发送RequestCode= REGISTER_BROKER的消息注册自己,这个消息还有一个附带作用就是充当心跳包

 

四、主要模块

4.1 注册broker(RequestCode= REGISTER_BROKER)

其中请求的头部RegisterBrokerRequestHeader

 

private String brokerName;
private String brokerAddr;
private String clusterName;
private String haServerAddr;
private Long brokerId;


请求的报文体则是 topic 的相关信息

 

 

private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();

 

 

不断的遍历第三节中的各个表,将数据注册进去,master的brokerId是0
如果不是master,是slave节点,会返回master和HAServer的地址
 
4.2 存放消息
Producer在存放消息时,首先会使用GET_ROUTEINTO_BY_TOPIC获取route信息TopicRouteData
 
4.3 读取消息

 

Consumer在读取消息时,也会先使用GET_ROUTEINTO_BY_TOPIC获取route信息TopicRouteData

 

4.4 后台模块

1. 心跳

2. 不断打印注册的kv信息