您现在的位置是:首页 > 编程 > 

RocketMQ(四):重复消费、消息重试、死信消息的解决方案

2025-07-26 20:43:38
RocketMQ(四):重复消费、消息重试、死信消息的解决方案 RocketMQ系列文章RocketMQ(一):基本概念和环境搭建RocketMQ(二):原生API快速入门RocketMQ(三):集成SpringBootRocketMQ(四):重复消费、消息重试、死信消息的解决方案一、重复消费1、消息重复的情况发送时消息重复 当一条消息已被成功发送到服务端并完成持久化此时出现了网络闪断或者客户端

RocketMQ(四):重复消费、消息重试、死信消息的解决方案

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot

RocketMQ(四):重复消费、消息重试、死信消息的解决方案

一、重复消费

1、消息重复的情况

  • 发送时消息重复
    • 当一条消息已被成功发送到服务端并完成持久化
    • 此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败
    • 如果此时生产者意识到消息发送失败并尝试再次发送消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 投递时消息重复
    • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断
    • 为了保证消息至少被消费一次
    • 消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
    • 当消息队列RocketMQ的Broker 或客户端重启、扩容或缩容
    • 会触发 Rebalance,此时消费者可能会收到重复消息

2、MySql唯一索引

  • 因为 Message ID 有可能出现冲突(重复)的情况
  • 所以用业务唯一标识作为幂等处理的关键依据

生产者

  • 相同的唯一业务编号,发送两次
代码语言:javascript代码运行次数:0运行复制
@Test
void test1() {
    // 业务唯一编号
    String key = "100";
    Message<String> message = MessageBuilder
            .withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, key)
            .build();
    // 相同的key发送两次
    rocketMQTemplate.syncSend("repeatedTopic", message);
    rocketMQTemplate.syncSend("repeatedTopic", message);
    println("发送完成");
}

消费者

  • 创建user表结构,num_no字段设置为唯一索引
  • 当唯一的业务id插入唯一索引的num_no字段
  • 只能插入一次,第二次会报唯一索引重复
  • 当获取到重复数据,直接返回即可,就不在执行业务代码
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "repeatedTopic", cumerGroup = "repeated-cumer-group")
public class RepeatMysqlListener implements RocketMQListener<MessageExt> {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public void onMessage(MessageExt message) {
        // 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
        String messageKey = message.getKeys();
        try {
            ("ISERT ITO `user` (`num_no`,`name`) VALUES('" + messageKey + "','名称')");
        } catch (DataAccessException e) {
            // 该message可能是重复的
            if (e instanceof DuplicateKeyException) {
                println(messageKey+"的业务编号数据重复了,直接return,就算消费了此重复数据");
                return;
            }
        }
        // 获取消息执行业务
        println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
    }
}

执行结果:

代码语言:javascript代码运行次数:0运行复制
发送完成
获取消息内容:【我是一个带key的消息】执行业务
100的业务编号数据重复了,直接return,就算消费了此重复数据

、redis分布式锁

Redisson分布式锁配置

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class RedissonConfig {
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:690")
                .setPassword("xc@124")
                .setDatabase(0);
        return (Redisson) (config);
    }
}

生产者

代码语言:javascript代码运行次数:0运行复制
@Test
void test1() {
    // 业务唯一编号
    String key = "1400";
    Message<String> message = MessageBuilder
            .withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, key)
            .build();
    // 相同的key发送两次
    rocketMQTemplate.syncSend("repeatedTopic", message);
    rocketMQTemplate.syncSend("repeatedTopic", message);
    println("发送完成");
}

消费者

  • 因为消费者是多线程并发消费
  • 如果遇到相同的唯一业务id,则上锁依次执行
  • 将执行过的唯一业务id放入redis
  • 下次相同业务id进入与redis集合对比,存在则证明已经执行过了
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "repeatedTopic", cumerGroup = "repeated-cumer-group")
public class RepeatRedisListener implements RocketMQListener<MessageExt> {

    @Autowired
    private Redisson redisson;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void onMessage(MessageExt message) {
        // 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
        String messageKey = message.getKeys();
        RLock redissonLock = redisson.getLock(messageKey);
        try {
            // 添加redisson锁并实现锁续命功能
            // 默认过期时间是0s,每10s触发一次锁续命功能
            redissonLock.lock();
            List<String> topicBusinessKeyList = ().range("topicBusinessKey",0,-1);
            if ( ObjectUtils.isotEmpty(topicBusinessKeyList) && (messageKey)) {
                println(messageKey + "的业务编号数据重复了,直接return,就算消费了此重复数据");
                return;
            }
            // 获取消息执行业务
            println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
            // 讲businessKey存入redis
            ().rightPush("topicBusinessKey", messageKey);
        } finally {
            redissonLock.unlock();
        }
    }
}

执行结果:

代码语言:javascript代码运行次数:0运行复制
发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据
二、消息重试

1、生产者重试

  • 可以分别设置同步消息和异步消息发送的重试次数
  • 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
  • 默认重试间隔时间为 1 秒,次数为2次
  • 发送消息超时时间默认000毫秒,如果因为超时,那么便不再尝试重试

application.yml配置文件设置

2、消费者重试

  • 默认的重试间隔:10s 0s 1m 2m m 4m 5m 6m 7m 8m 9m 10m 20m 0m 1h 2h
  • 默认多线程模式下,重试16次,设置超过 16 次的重试时间间隔均为每次 2 小时
  • 某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
  • 在单线程的顺序模式下,重试Integer.MAX_VALUE次,间隔1秒

消费者配置

  • 实现RocketMQPushCumerLifecycleListener接口,从prepareStart方法中获取消费者并设置它
  • 消息最大重试次数的设置对相同GroupID下的所有Cumer实例有效
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(topic = "retryTopic",
        cumerGroup = "retry-cumer-group"
)
public class RetryListener implements RocketMQListener<MessageExt>, RocketMQPushCumerLifecycleListener {
    @Override
    public void onMessage(MessageExt message) {
        println("当前时间: " + new Date());
        //获取消息的重试次数
        println("重试次数: " + message.getRecumeTimes());
        println("接收内容: " + new String(message.getBody()));
        println("----------------------------------------------");
        throw new RuntimeException("测试重试次数");
    }

    @Override
    public void prepareStart(DefaultMQPushCumer defaultMQPushCumer) {
        // 设置消费者重试次数
        defaultMQPushCumer.setMaxRecumeTimes(2);
        // 实例名称-控制面板可以看到
        defaultMQPushCumer.setInstanceame("消费者1号");
    }
}

设置重试二次的执行结果:

三、死信消息
  • 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
  • 死信队列是死信Topic下分区数唯一的单独队列
  • 死信Topic名称为%DLQ%原消费者组名,死信队列的消息将不会再被消费

上一节的消费者重试两次后,就会将消息放入死信队列

处理死信消息方式一:

  • 监听死信队列处理消息
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(
        topic = "%DLQ%retry-cumer-group",
        cumerGroup = "retry-dead-cumer-group"
)
public class RetryDeadCumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 处理消息 签收了
        println("记录到特别的位置 文件 mysql 通知人工处理");
    }
}

处理死信消息方式二:

  • 控制重试次数,重试几次后,直接记录到数据库等等
代码语言:javascript代码运行次数:0运行复制
@Component
@RocketMQMessageListener(
        topic = "%DLQ%retry-cumer-group",
        cumerGroup = "retry-dead-cumer-group"
)
public class RetryDeadCumer2 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        // 业务处理
        try {
            int i = 1 / 0;
        } catch (Exception e) {
            // 重试
            int recumeTimes = messageExt.getRecumeTimes();
            if (recumeTimes >= ) {
                // 不要重试了
                println("记录到特别的位置 文件 mysql 通知人工处理");
            }else {
                throw new RuntimeException("异常");
            }
        }
    }
}
四、消费堆积

一般认为单条队列消息差值>=10w时 算堆积问题

什么情况下会出现堆积

  • 生产太快
    • 生产方可以做业务限流
    • 增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
  • 消费者消费出现问题
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2024-02-21,如有侵权请联系 cloudcommunity@tencent 删除队列服务端解决方案数据rocketmq

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1161436.html

相关标签:无
上传时间: 2025-07-20 16:43:40
留言与评论(共有 17 条评论)
本站网友 工效挂钩
15分钟前 发表
则上锁依次执行将执行过的唯一业务id放入redis下次相同业务id进入与redis集合对比
本站网友 吃石榴上火吗
14分钟前 发表
此时消费者可能会收到重复消息2
本站网友 酷派5860s
22分钟前 发表
消息重复的情况发送时消息重复 当一条消息已被成功发送到服务端并完成持久化此时出现了网络闪断或者客户端宕机
本站网友 密码泄露
29分钟前 发表
死信消息当消费重试到达阈值以后
本站网友 樱桃红第二部
29分钟前 发表
消息已投递到消费者并完成业务处理
本站网友 假奶粉
9分钟前 发表
此时消费者可能会收到重复消息2
本站网友 雯靖
3分钟前 发表
即消费失败后
本站网友 虚拟打印机破解版
15分钟前 发表
如果因为超时
本站网友 argosy
7分钟前 发表
则keys值一定相同) String messageKey = message.getKeys(); RLock redissonLock = redisson.getLock(messageKey); try { // 添加redisson锁并实现锁续命功能 // 默认过期时间是0s
本站网友 航天晨光股票
26分钟前 发表
直接return
本站网友 解密ufo
8分钟前 发表
直接返回即可
本站网友 numberlock
15分钟前 发表
message); rocketMQTemplate.syncSend("repeatedTopic"
本站网友 男人为什么会蛋疼
28分钟前 发表
则keys值一定相同) String messageKey = message.getKeys(); try { ("ISERT ITO `user` (`num_no`
本站网友 丽都壹号
19分钟前 发表
死信消息的解决方案一
本站网友 拂袖而去
4分钟前 发表
分享自作者个人站点/博客
本站网友 静海二手房出售
24分钟前 发表
消费堆积一般认为单条队列消息差值>=10w时 算堆积问题 什么情况下会出现堆积 生产太快 生产方可以做业务限流增加消费者数量