简单介绍
RocketMQ生产者启动流程
RocketMQ 生产者启动流程
还记得上一个章节的生产者发送消息的代码吗?我们在构造函数中新建了 DefaultMQProducer 类,并且设置了 NamesrvAddr 属性,它标志着 Namesrv 的地址,随后我们调用了 prodeucer.start() 来启动这个生产者,接下来我们就要探究 start() 方法究竟做了什么。
在书写本章时的代码版本为
RocketMQ-Client 4.3.0, git节点在b4240d5cea8,如果代码有所不同请见谅
DefaultMQProducer 类
DefaultMQProducer 类是 RocketMQ 生产者的默认实现,它继承了 ClientConfig 类 并间接实现了了 MQAdmin 类,类图见下图,接下来我们来讲一下这一个类一个接口。

ClientConfig
在这里我列举了 ClientConfig 的几个重要属性,和本章中比较重要的方法:buildMQClientId,这里ClientID主要是为了区分多台服务器的不同JVM实例,可以看到 ClientID 由 ClientIP 、InstanceName 、 unitName 组成,其中 ClientIP 分区了不同的服务器, Instancename 区分了不同JVM实例,unitName 区分了一个JVM实例中的不同子实例。如果我们不设置 InstanceName和unitName ,就可以使得 在JVM实例中只存在一个ClientID。
private String namesrvAddr
private String clientIP;
private String instanceName;
private boolean unitMode = false;
private String unitName;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}MQAdmin
MQAdmin接口定义了一些非常基本的方法,比如说createTopic 、searchOffset 之类的。
生产者启动流程
Step1
我们从 DefaultMQProducer.start()方法开始追踪
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}首先我们重新设置了生产者的生产者组名称,这里是为了兼容 NameSpace ,其中 NameSpace 在 ClientConfig 中配置,如果我们未配置的话则为原样的ProducerGroup,之后我们调用了 defaultMQProducerImpl 来启动生产者,那么这个defaultMQProducerImpl是什么呢。我们可以从上述代码中的第10行看出,他是一个DefaultMQProducerImpl类,也就是默认的生产者实现类,实际上生产者的消息发送都是通过这个类来实现的 ,注意:我们通过构造方法把我们自身传递给了DefaultMQProducerImpl,这也是说在DefaultMQProducerImpl可以访问生产者中的任何公开信息。
Step2
那么我们切换目标来跟踪defaultMQProducerImpl.start();方法
private ServiceState serviceState = ServiceState.CREATE_JUST;
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 具体的生产者启动流程
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}这一大段代码是为了保证每一个生产者只启动一次。接下来我们就开始看看具体的流程吧
Step3
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}首先检查prodeucerGroup是否符合要求,如果 producerGroup 不是 CLIENT_INNER_PRODUCER_GROUP 并且没有手动设置 InstanceName 的话就会重新设置一下 InstanceName 为 当前JVM程序运行的 PID。(这里其实可以思考一下为什么要这样做?)
Step4
# DefaultMQProducerImpl.java
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
# MQClientManager.java
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}这里可以跳转到 附录 查看MQClientManager 类的介绍。这段代码就是为了创建 MQClientInstance 对象,这里我们可以看出其实我们是基于ClientID来获取 MQClientInstance 对象的。而根据上述代码我们可以轻松的知道:如果我们未手动配置 InstanceName 和 unitName 的话,一个JVM实例只会有一个 ClientID(ClientID=ClientIP+PID) ,并且通过代码我们知道这个实例是缓存在了 factoryTable 中,也就是说,在一个JVM实例中一般只会存在一个MQClientInstance 实例。
MQClientInstance封住了RocketMQ网络处理API,是Producer、Consumer与NamServer、Broker打交道的网络对象
Step5
我们获取到了 MQClientInstance 之后,我们就要开始往 MQClientInstance 中注册我们的生产者。
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);注册的过程也很简单,只是更新了一下 MQClientInstance 的 producerTable 生产者列表而已。这里我们可以得出一个简单的结论:一个JVM实例中只存在一个 ProducerGroup 的生产者,这是为了使得一个JVM程序崩溃之后不会影响一个生产者组中的多个生产者。(其实)
Step6
增加默认的Topic信息。并且启动 MQClientInstance
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}MQClientManager类
MQClientManager 是用来维护 MQClientInstance 的。一个JVM实例中只会存在一个 MQClientInstance 实例,同时也为 MQClientManager 的实现是在是太轻量化了,所以RocketMQ的作者yukong大佬也是直接使用了饿汉式的单例模式。
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
private MQClientManager() {
}
public static MQClientManager getInstance() {
return instance;
}
}同时内部维护了一套 ClientID 和 MQClientInstance 的对应关系,并且提供了方法去添加、移除,这里不做更多的介绍。
写在最后
讲道理写生产者启动流程还是蛮累的。本以为会比较轻松,因为相对于一些其他流程来说应该是相对比较简单的,没想到还是花了两天的时间陆陆续续的写完。
现在是6月27日的晚上9点45分,周日,没想到一周唯一的休息日就这样在睡觉中度过去了(是真的睡了一天),最后到了晚上吃完饭、吃完水果才想到博客没有写完,今天施队还说我吃饭吃的太少,水果吃的多,怕是会的糖尿病,整个人估计是废了吧。


