您现在的位置是:首页 > 编程 > 

RocketMQ(三):集成SpringBoot

2025-07-27 06:57:42
RocketMQ(三):集成SpringBoot RocketMQ系列文章RocketMQ(一):基本概念和环境搭建RocketMQ(二):原生API快速入门RocketMQ(三):集成SpringBoot一、搭建环境需要创建两个服务,消息生产服务和消息消费者服务生产消息存在多个服务,消费则统一由一个服务处理这样可以做到解耦 pom.xml生产者和消费者都需要代码语言:javascript代码运行

RocketMQ(三):集成SpringBoot

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot

一、搭建环境
  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
代码语言:javascript代码运行次数:0运行复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
代码语言:javascript代码运行次数:0运行复制
rocketmq:
    name-server: 127.0.0.1:9876     # rocketMq的nameServer地址
    producer:
        group: boot-producer-group        # 生产者组别
        send-message-timeout: 000  # 消息发送的超时时间
        retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数
        max-message-size: 419404       # 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
代码语言:javascript代码运行次数:0运行复制
rocketmq:
    name-server: localhost:9876
二、不同类型消息

直接引入即可

代码语言:javascript代码运行次数:0运行复制
@Autowired
private RocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,通知等
代码语言:javascript代码运行次数:0运行复制
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型
    • MessageExt类型是消息的所有内容
    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "bootTestTopic", cumerGroup = "boot-test-cumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        println(new String(message.getBody()));
    }
}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
代码语言:javascript代码运行次数:0运行复制
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        println("成功");
    }
    @Override
    public void onException(Throwable throwable) {
        println("失败" + throwable.getMessage());
    }
});

、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
代码语言:javascript代码运行次数:0运行复制
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 0s 1m 2m m 4m 5m 6m 7m 8m 9m 10m 20m 0m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是0s后被监听消费
代码语言:javascript代码运行次数:0运行复制
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 000, 4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
代码语言:javascript代码运行次数:0运行复制
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("qwer", 1, "下单"),
        new MsgModel("qwer", 1, ""),
        new MsgModel("qwer", 1, "物流"),

        new MsgModel("zxcv", 2, "下单"),
        new MsgModel("zxcv", 2, ""),
        new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
    // 发送  一般都是以json的方式进行处理
    // 根据第三个参数计算hash值决定消息放入哪个队列
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", (msgModel), msgModel.getOrderSn());
});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
        cumerGroup = "boot-orderly-cumer-group",
        cumeMode = CumeMode.ORDERLY, // 顺序消费模式 单线程
        maxRecumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSO.parseObject(new String(message.getBody()), );
        println(msgModel);
    }
}

6、带tag消息

  • tag带在主题后面用:来携带
代码语言:javascript代码运行次数:0运行复制
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");

7、带key消息

代码语言:javascript代码运行次数:0运行复制
Message<String> message = MessageBuilder
        .withPayload("我是一个带key的消息")
        .setHeader(RocketMQHeaders.KEYS, "10086")
        .build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "bootTagTopic",
        cumerGroup = "boot-tag-cumer-group",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式
//        selectorExpression = "a in (,5,7)" // 中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        println("获取keys: " + message.getKeys());
        println("消息内容: " + new String(message.getBody()));
    }
}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取
三、消息消费两种模式
  • Rocketmq消息消费的模式分为两种:负载均衡模式广播模式
  • 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
  • 广播模式表示每个消费者都消费一遍订阅的主题的消息

1、负载均衡模式

  • 队列会被消费者分摊
  • 队列数量应该>=消费者数量,否则多出来的消费者永远接收不到消息
  • mq服务器会记录消息的消费点位(即消息是否被消费)

创建多个消费者监听同一个主题

代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-a",
        messageModel = MessageModel.CLUSTERIG, // 集模式(负载均衡)
)
public class DC1 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-a组的第一个消费者:" + message);
    }
}

@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-a",
        messageModel = MessageModel.CLUSTERIG // 集模式(负载均衡)
)
public class DC2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-a组的第二个消费者:" + message);

    }
}

@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-a",
        messageModel = MessageModel.CLUSTERIG // 集模式(负载均衡)
)
public class DC implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-a组的第三个消费者:" + message);

    }
}

生产者发送多条消息

代码语言:javascript代码运行次数:0运行复制
@Test
public void modeTest() throws Exception {
    for (int i = 1; i <= 10; i++) {
        rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
    }
}

执行结果:

代码语言:javascript代码运行次数:0运行复制
我是mode-cumer-group-a组的第一个消费者:我是第4个消息
我是mode-cumer-group-a组的第一个消费者:我是第8个消息
我是mode-cumer-group-a组的第三个消费者:我是第个消息
我是mode-cumer-group-a组的第三个消费者:我是第7个消息
我是mode-cumer-group-a组的第二个消费者:我是第2个消息
我是mode-cumer-group-a组的第二个消费者:我是第6个消息
我是mode-cumer-group-a组的第二个消费者:我是第10个消息
我是mode-cumer-group-a组的第一个消费者:我是第1个消息
我是mode-cumer-group-a组的第一个消费者:我是第5个消息
我是mode-cumer-group-a组的第一个消费者:我是第9个消息

第一个消费者消费了5个消息,第二个消费了个,第三个消费了2个,为什么没有平均开?

  • 10个消息,平均发送到4个队列
  • 第一个消费者接收队列0和队列1的消息
  • 第二个消费者接收队列2的消息
  • 第三个消费者接收队列的消息
  • 如果是四个消费者,则每个消费者只能接收其中之一队列的消息
  • 如果是五个消费者,那么第五个消费者哪个队列的消息都不能接收到
  • 所谓的负载均衡则是队列被平分,而不是消息
  • 代理者点位:可以认为此队列接收的消息数量
  • 消费者点位:可以认为是已经消费的消息数量
  • 差值:代理者点位-消费者点位,待处理的消息数量

2、广播模式

  • 消费者组订阅的主题接收到的消息,每个消费者都会消费
  • mq服务器不会记录消息的消费点位(即消息是否被消费永远未知)

创建多个消费者监听同一个主题

  • 一个springboot服务配置了多个相同组和主题的cumer
    • 需要指定唯一instanceame
    • 实现RocketMQPushCumerLifecycleListener接口
  • 否则报错The cumer group has been created before, specify another name please
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-b",
        messageModel = MessageModel.BROADCASTIG // 广播模式
)
public class DC4 implements RocketMQListener<String>, RocketMQPushCumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-b组的第一个消费者:" + message);
    }

    @Override
    public void prepareStart(DefaultMQPushCumer defaultMQPushCumer) {
        defaultMQPushCumer.setInstanceame("第一个消费者");
    }
}

@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-b",
        messageModel = MessageModel.BROADCASTIG // 广播模式
)
public class DC5 implements RocketMQListener<String>, RocketMQPushCumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-b组的第二个消费者:" + message);
    }
    @Override
    public void prepareStart(DefaultMQPushCumer defaultMQPushCumer) {
        defaultMQPushCumer.setInstanceame("第二个消费者");
    }
}

@Component
@RocketMQMessageListener(topic = "modeTopic",
        cumerGroup = "mode-cumer-group-b",
        messageModel = MessageModel.BROADCASTIG // 广播模式
)
public class DC6 implements RocketMQListener<String>, RocketMQPushCumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        println("我是mode-cumer-group-b组的第三个消费者:" + message);
    }

    @Override
    public void prepareStart(DefaultMQPushCumer defaultMQPushCumer) {
        defaultMQPushCumer.setInstanceame("第三个消费者");
    }
}

生产者发送多条消息

代码语言:javascript代码运行次数:0运行复制
@Test
public void modeTest() throws Exception {
    for (int i = 1; i <= ; i++) {
        rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
    }
}

执行结果:

  • 生产者发送条消息
  • 三个消费者,每个都会消费这条消息
  • 所以,消费者点位也没法移动,索性就不动了
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:202-11-22,如有侵权请联系 cloudcommunity@tencent 删除队列服务异步负载均衡rocketmq

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1161443.html

相关标签:无
上传时间: 2025-07-20 16:44:35
留言与评论(共有 18 条评论)
本站网友 龟版胶
21分钟前 发表
selectorType = SelectorType.TAG
本站网友 linaro
23分钟前 发表
可以设置为单线程顺序模式设置消费重试次数代码语言:javascript代码运行次数:0运行复制@Component @RocketMQMessageListener(topic = "bootOrderlyTopic"
本站网友 今日美金汇率
28分钟前 发表
需要设置不同的消费者组代码语言:javascript代码运行次数:0运行复制rocketmq
本站网友 推荐几个成人网站
6分钟前 发表
msgModel.getOrderSn()); }); 消费消息 默认是并发消费模式
本站网友 营养减肥食谱
14分钟前 发表
"物流") ); msgModels.forEach(msgModel -> { // 发送 一般都是以json的方式进行处理 // 根据第三个参数计算hash值决定消息放入哪个队列 rocketMQTemplate.syncSendOrderly("bootOrderlyTopic"
本站网友 工商企业管理主修课程
3分钟前 发表
boot-producer-group # 生产者组别 send-message-timeout
本站网友 trd
13分钟前 发表
" + message); } } 生产者发送多条消息 代码语言:javascript代码运行次数:0运行复制@Test public void modeTest() throws Exception { for (int i = 1; i <= 10; i++) { rocketMQTemplate.syncSend("modeTopic"
本站网友 浩方改建
25分钟前 发表
cumerGroup = "mode-cumer-group-b"
本站网友 手足口病的症状
26分钟前 发表
标签keys从消息头里面获取三
本站网友 乳加力
19分钟前 发表
1
本站网友 杭州城西银泰城地址
30分钟前 发表
"下单")
本站网友 赵建军
13分钟前 发表
等级1就对应1s
本站网友 防水防腐
17分钟前 发表
带tag消息tag带在主题后面用
本站网友 老鼠肉可以吃吗
6分钟前 发表
1
本站网友 百年人寿官方网站
22分钟前 发表
7)" // 中开启enbalePropertyFilter=true ) public class CTagMsgListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { println("获取keys
本站网友 云南爱因森软件职业学院
16分钟前 发表
message); 获取带key和tag的消费者 过滤模式有两种:正则表达式和sql92方式keys从MessageExt对象中获取代码语言:javascript代码运行次数:0运行复制@Component @RocketMQMessageListener(topic = "bootTagTopic"
本站网友 莱州租房
6分钟前 发表
没有消息头内容(keys