RocketMQ 生产者

本文最后更新于:2021年9月20日 上午

主要介绍了RocketMQ生产者最佳实践代码,以Java为例

RocketMQ 最佳实践

本文章的程序运行环境如下:

JKD1.8

RocketMQ-Client 4.3.0

RocketMQ-Server 4.8.0

NameServer Addr: 127.0.0.1:9876

前期准备

首先我们新建一个Demo类来存储所有的代码,我们使用默认的生产者实现 DefaultMQProducer 来操作,并且制定默认的生产者组,该构造方法的第二个参数是 RPCHookRPCHook 是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequestdoAfterResponse,表示在执行请求之前和接收返回之后分别执行相关逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class ProducerDemo {
private DefaultMQProducer producer;
private String topicName = "test-topic";

public ProducerDemo(String namesrvAdder, String producerGroupName) throws MQClientException {
// 实例化消息生产者Producer
// 这里是因为方便测试才这样做的,实际上应该由Ioc传递进来的
producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr(namesrvAdder);
producer.start();
}

public ProducerDemo(String namesrvAdder, String producerGroupName, String topicName) throws MQClientException {
this(namesrvAdder, producerGroupName);
this.topicName = topicName;
}

public void closeProducers() {
producer.shutdown();
}
}

我们再来新建一个测试类来测试我们的代码,并指定默认行为,以后测试的时候我们只需要使用producer就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ProducerDemoTest {
private ProducerDemo producer;

@Before
public void init() throws MQClientException {
producer = new ProducerDemo("localhost:9876", "test_producer");
}

@After
public void close() {
producer.closeProducers();
}
}

同步发送消息

同步发送消息,根据消息类型不同 增加了 tag、keys等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {

Message message = new Message(topicName, "sync,no tag".getBytes(Charset.defaultCharset()));
SendResult result = producer.send(message);
log.info("sync :{}\n", result);

Message hasTagMessage = new Message(topicName, "test_tag", "sync,has tag".getBytes(StandardCharsets.UTF_8));
result = producer.send(hasTagMessage);
log.info("has tag{}\n", result);

Message hasTagAndKeysMessage = new Message(topicName, "test_tag", "test_keys", "sync,has tag and keys".getBytes(StandardCharsets.UTF_8));
result = producer.send(hasTagAndKeysMessage);
log.info("has tag and key result:{}\n", result);
}

异步发送消息

异步发送消息,在 send 方法执行之后程序会继续往下执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void async() throws RemotingException, InterruptedException, MQClientException {
Message message = new Message(topicName, "async, no tag".getBytes(StandardCharsets.UTF_8));

producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("async result:{}", sendResult);
}

@Override
public void onException(Throwable throwable) {
log.error("async got exception:{}", throwable.toString());
}
});

Thread.sleep(2000);
}

批量发送消息

1
2
3
4
5
6
7
8
public void batchSend() throws Exception {
ArrayList<Message> list = new ArrayList<>();
list.add(new Message(topicName, "batch message no.1".getBytes(StandardCharsets.UTF_8)));
list.add(new Message(topicName, "batch message no.2".getBytes(StandardCharsets.UTF_8)));
list.add(new Message(topicName, "batch message no.3".getBytes(StandardCharsets.UTF_8)));
SendResult result = producer.send(list);
log.info("batch send got :{}", result);
}

指定消息队列发送消息

1
2
3
4
5
6
public void sendToMessageQueue() throws Exception {
Message message = new Message(topicName, "send 2 message queue".getBytes(StandardCharsets.UTF_8));
List<MessageQueue> queues = producer.fetchPublishMessageQueues(topicName);
MessageQueue queue = queues.get(queues.size() - 1);
producer.send(message, queue);
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!