一、RocketMQ简介

RocketMQ是阿里开源的分布式消息中间件。

核心概念:

  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:消息服务器
  • NameServer:路由注册中心
  • Topic:消息主题
  • Queue:消息队列

二、SpringBoot整合RocketMQ

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

配置

1
2
3
4
5
6
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2

三、同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Service
public class MessageService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 同步发送
*/
public SendResult sendSync(String topic, String message) {
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());
}

/**
* 同步发送,带超时
*/
public SendResult sendSync(String topic, String message, long timeout) {
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout);
}

/**
* 同步发送,带超时和重试
*/
public SendResult sendSync(String topic, String message, long timeout, int retry) {
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, retry);
}
}

四、异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 异步发送
*/
public void sendAsync(String topic, String message) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功: {}", sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
}

五、单向发送

不等待应答,不关心发送结果。

1
2
3
4
5
6
/**
* 单向发送
*/
public void sendOneWay(String topic, String message) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());
}

适用场景:日志采集等允许丢失的场景。

六、延迟消息

1
2
3
4
5
6
7
8
9
/**
* 延迟消息
*/
public SendResult sendDelay(String topic, String message, int delayLevel) {
return rocketMQTemplate.syncSend(topic,
MessageBuilder.withPayload(message).build(),
3000,
delayLevel);
}

延迟级别:

  1. 1s
  2. 5s
  3. 10s
  4. 30s
  5. 1min
  6. 2min
  7. 3min
  8. 4min
  9. 5min
  10. 6min
  11. 7min
  12. 8min
  13. 9min
  14. 10min
  15. 20min
  16. 30min
  17. 1h
  18. 2h

七、顺序消息

发送顺序消息

1
2
3
4
5
6
7
8
/**
* 顺序消息(根据hashKey选择队列)
*/
public SendResult sendOrderly(String topic, String message, String hashKey) {
return rocketMQTemplate.syncSendOrderly(topic,
MessageBuilder.withPayload(message).build(),
hashKey);
}

消费顺序消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY
)
@Component
public class OrderMessageListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("顺序消费: {}", message);
}
}

八、消费消息

Push模式

1
2
3
4
5
6
7
8
9
10
11
12
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "test-consumer-group"
)
@Component
public class TestMessageListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("收到消息: {}", message);
}
}

消费对象消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@RocketMQMessageListener(
topic = "user-topic",
consumerGroup = "user-consumer-group",
selectorExpression = "tag1 || tag2"
)
@Component
public class UserMessageListener implements RocketMQListener<UserMessage> {

@Override
public void onMessage(UserMessage message) {
log.info("收到用户消息: {}", message);
}
}

广播消费

1
2
3
4
5
6
7
8
9
10
11
12
13
@RocketMQMessageListener(
topic = "broadcast-topic",
consumerGroup = "broadcast-consumer-group",
messageModel = MessageModel.BROADCASTING
)
@Component
public class BroadcastMessageListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("广播消费: {}", message);
}
}

九、事务消息

事务消息生产者

1
2
3
4
5
6
7
8
/**
* 发送事务消息
*/
public void sendTransactionMessage(String topic, String message, Object arg) {
rocketMQTemplate.sendMessageInTransaction(topic,
MessageBuilder.withPayload(message).build(),
arg);
}

事务监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@RocketMQTransactionListener
@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderService orderService;

/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder((OrderDTO) arg);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

/**
* 事务回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = (String) msg.getHeaders().get("orderId");
Order order = orderService.getById(orderId);
if (order != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

十、消息过滤

Tag过滤

1
2
3
4
5
6
7
8
9
// 发送时指定tag
rocketMQTemplate.convertAndSend("topic:tag1", message);

// 消费时过滤tag
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "group",
selectorExpression = "tag1 || tag2"
)

SQL92过滤

1
2
3
4
5
6
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "group",
selectorType = SelectorType.SQL92,
selectorExpression = "age > 18 and type = 'VIP'"
)

发送时设置属性:

1
2
3
4
5
Message<?> message = MessageBuilder.withPayload(data)
.setHeader("age", 20)
.setHeader("type", "VIP")
.build();
rocketMQTemplate.syncSend("topic", message);

十一、消息重试和死信

消费重试

1
2
3
4
5
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "group",
maxReconsumeTimes = 3 // 最大重试次数
)

消费失败返回RECONSUME_LATER:

1
2
3
4
5
6
7
8
9
@Override
public ConsumeConcurrentlyStatus onMessage(MessageExt msg) {
try {
// 处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

死信队列

超过最大重试次数后,消息进入死信队列:

  • Topic:%DLQ% + ConsumerGroup
  • 需要单独消费处理

十二、最佳实践

生产者

  • 同步发送保证可靠性
  • 合理设置超时和重试
  • 设置合理的Key便于排查
1
2
3
Message<?> message = MessageBuilder.withPayload(data)
.setHeader(RocketMQHeaders.KEYS, orderId)
.build();

消费者

  • 幂等处理,防止重复消费
  • 尽早确认,避免消息堆积
  • 处理失败返回重试

通用

  • 合理规划Topic和Tag
  • 监控消息堆积
  • 控制消息大小

十三、总结

RocketMQ整合SpringBoot要点:

  • 三种发送方式:同步、异步、单向
  • 顺序消息保证消费顺序
  • 事务消息保证最终一致性
  • Tag和SQL92过滤消息
  • 消费重试和死信队列处理
  • 幂等消费保证可靠性