RocketMQ整合SpringBoot常用操作
一、RocketMQ简介
RocketMQ是阿里开源的分布式消息中间件。
核心概念:
- Producer:消息生产者
- Consumer:消息消费者
- Broker:消息服务器
- NameServer:路由注册中心
- Topic:消息主题
- Queue:消息队列
二、SpringBoot整合RocketMQ
添加依赖
1 | <dependency> |
配置
1 | rocketmq: |
三、同步发送
1 |
|
四、异步发送
1 | /** |
五、单向发送
不等待应答,不关心发送结果。
1 | /** |
适用场景:日志采集等允许丢失的场景。
六、延迟消息
1 | /** |
延迟级别:
- 1s
- 5s
- 10s
- 30s
- 1min
- 2min
- 3min
- 4min
- 5min
- 6min
- 7min
- 8min
- 9min
- 10min
- 20min
- 30min
- 1h
- 2h
七、顺序消息
发送顺序消息
1 | /** |
消费顺序消息
1 |
|
八、消费消息
Push模式
1 |
|
消费对象消息
1 |
|
广播消费
1 |
|
九、事务消息
事务消息生产者
1 | /** |
事务监听器
1 |
|
十、消息过滤
Tag过滤
1 | // 发送时指定tag |
SQL92过滤
1 |
发送时设置属性:
1 | Message<?> message = MessageBuilder.withPayload(data) |
十一、消息重试和死信
消费重试
1 |
消费失败返回RECONSUME_LATER:
1 |
|
死信队列
超过最大重试次数后,消息进入死信队列:
- Topic:%DLQ% + ConsumerGroup
- 需要单独消费处理
十二、最佳实践
生产者
- 同步发送保证可靠性
- 合理设置超时和重试
- 设置合理的Key便于排查
1 | Message<?> message = MessageBuilder.withPayload(data) |
消费者
- 幂等处理,防止重复消费
- 尽早确认,避免消息堆积
- 处理失败返回重试
通用
- 合理规划Topic和Tag
- 监控消息堆积
- 控制消息大小
十三、总结
RocketMQ整合SpringBoot要点:
- 三种发送方式:同步、异步、单向
- 顺序消息保证消费顺序
- 事务消息保证最终一致性
- Tag和SQL92过滤消息
- 消费重试和死信队列处理
- 幂等消费保证可靠性
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 夏天的风吹向哪里!
