RocketMQ 生产者启动流程

本文最后更新于:2022年7月29日 下午

简单介绍 RocketMQ 生产者启动流程

RocketMQ 生产者启动流程

还记得上一个章节的生产者发送消息的代码吗?我们在构造函数中新建了 DefaultMQProducer 类,并且设置了 NamesrvAddr 属性,它标志着 Namesrv 的地址,随后我们调用了 prodeucer.start() 来启动这个生产者,接下来我们就要探究 start() 方法究竟做了什么。

在书写本章时的代码版本为 RocketMQ-Client 4.3.0 , git节点在 b4240d5cea8,如果代码有所不同请见谅

DefaultMQProducer

DefaultMQProducer 类是 RocketMQ 生产者的默认实现,它继承了 ClientConfig 类 并间接实现了了 MQAdmin 类,类图见下图,接下来我们来讲一下这一个类一个接口。

image-20210626102312384

ClientConfig

在这里我列举了 ClientConfig 的几个重要属性,和本章中比较重要的方法:buildMQClientId,这里ClientID主要是为了区分多台服务器的不同JVM实例,可以看到 ClientIDClientIPInstanceNameunitName 组成,其中 ClientIP 分区了不同的服务器, Instancename 区分了不同JVM实例,unitName 区分了一个JVM实例中的不同子实例。如果我们不设置 InstanceNameunitName ,就可以使得 在JVM实例中只存在一个ClientID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private String namesrvAddr
private String clientIP;
private String instanceName;
private boolean unitMode = false;
private String unitName;

public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());

sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}

return sb.toString();
}

MQAdmin

MQAdmin接口定义了一些非常基本的方法,比如说createTopicsearchOffset 之类的。

生产者启动流程

Step1

我们从 DefaultMQProducer.start()方法开始追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

首先我们重新设置了生产者的生产者组名称,这里是为了兼容 NameSpace ,其中 NameSpaceClientConfig 中配置,如果我们未配置的话则为原样的ProducerGroup,之后我们调用了 defaultMQProducerImpl 来启动生产者,那么这个defaultMQProducerImpl是什么呢。我们可以从上述代码中的第10行看出,他是一个DefaultMQProducerImpl类,也就是默认的生产者实现类,实际上生产者的消息发送都是通过这个类来实现的 ,注意:我们通过构造方法把我们自身传递给了DefaultMQProducerImpl,这也是说在DefaultMQProducerImpl可以访问生产者中的任何公开信息。

Step2

那么我们切换目标来跟踪defaultMQProducerImpl.start();方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private ServiceState serviceState = ServiceState.CREATE_JUST;
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 具体的生产者启动流程
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}

这一大段代码是为了保证每一个生产者只启动一次。接下来我们就开始看看具体的流程吧

Step3

1
2
3
4
5
this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

首先检查prodeucerGroup是否符合要求,如果 producerGroup 不是 CLIENT_INNER_PRODUCER_GROUP 并且没有手动设置 InstanceName 的话就会重新设置一下 InstanceName 为 当前JVM程序运行的 PID。(这里其实可以思考一下为什么要这样做?)

Step4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# DefaultMQProducerImpl.java
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

# MQClientManager.java

private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}

return instance;
}

这里可以跳转到 附录 查看MQClientManager 类的介绍。这段代码就是为了创建 MQClientInstance 对象,这里我们可以看出其实我们是基于ClientID来获取 MQClientInstance 对象的。而根据上述代码我们可以轻松的知道:如果我们未手动配置 InstanceNameunitName 的话,一个JVM实例只会有一个 ClientIDClientID=ClientIP+PID) ,并且通过代码我们知道这个实例是缓存在了 factoryTable 中,也就是说,在一个JVM实例中一般只会存在一个MQClientInstance 实例。

MQClientInstance 封住了RocketMQ网络处理API,是Producer、Consumer与NamServer、Broker打交道的网络对象

Step5

我们获取到了 MQClientInstance 之后,我们就要开始往 MQClientInstance 中注册我们的生产者。

1
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

注册的过程也很简单,只是更新了一下 MQClientInstanceproducerTable 生产者列表而已。这里我们可以得出一个简单的结论:一个JVM实例中只存在一个 ProducerGroup 的生产者,这是为了使得一个JVM程序崩溃之后不会影响一个生产者组中的多个生产者。(其实)

Step6

增加默认的Topic信息。并且启动 MQClientInstance

1
2
3
4
5
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

MQClientManager类

MQClientManager 是用来维护 MQClientInstance 的。一个JVM实例中只会存在一个 MQClientInstance 实例,同时也为 MQClientManager 的实现是在是太轻量化了,所以RocketMQ的作者yukong大佬也是直接使用了饿汉式的单例模式。

1
2
3
4
5
6
7
8
9
10
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
private MQClientManager() {
}
public static MQClientManager getInstance() {
return instance;
}
}

同时内部维护了一套 ClientIDMQClientInstance 的对应关系,并且提供了方法去添加、移除,这里不做更多的介绍。

写在最后

讲道理写生产者启动流程还是蛮累的。本以为会比较轻松,因为相对于一些其他流程来说应该是相对比较简单的,没想到还是花了两天的时间陆陆续续的写完。

现在是6月27日的晚上9点45分,周日,没想到一周唯一的休息日就这样在睡觉中度过去了(是真的睡了一天),最后到了晚上吃完饭、吃完水果才想到博客没有写完,今天施队还说我吃饭吃的太少,水果吃的多,怕是会的糖尿病,整个人估计是废了吧。

image-20210627215117279


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!