RabbitMQ、RocketMQ、Kafka延迟队列实现

兄弟姐妹们在线分析下,RabbitMQ、RocketMQ、Kafka延迟队列实现
最新回答
一白遮

2022-09-22 20:56:31

RabbitMQ、RocketMQ、Kafka 实现延迟队列的方式及原理如下:

RabbitMQ
  • 实现方式:通过 DLX(死信交换机)TTL(消息过期时间) 间接实现延迟队列。

  • TTL 设置方式

    队列级 TTL:通过参数 x-message-ttl 设置,单位为毫秒,队列中所有消息共享同一过期时间。

    Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 6000); // 6秒channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

    消息级 TTL:通过 setExpiration 方法单独设置每条消息的过期时间。

    AMQP.BasicProperties properties = new AMQP.BasicProperties();properties.setDeliveryMode(2);properties.setExpiration("60000"); // 60秒channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

    优先级规则:若同时设置队列级和消息级 TTL,以较小值为准。

  • DLX 死信交换机

    触发条件:消息被拒绝(reject 且 requeue=false)、消息过期、队列达到最大长度。

    配置参数

    x-dead-letter-exchange:指定死信交换机名称。

    x-dead-letter-routing-key(可选):指定死信路由键,未设置则使用原队列路由键。

    Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  • 实现原理:消息发送至普通队列后,若未被消费且达到 TTL 时间,则被转发至绑定的 DLX 死信队列。通过不同延迟级别的队列组合,可实现多级延迟效果。

RocketMQ
  • 实现方式:原生支持延迟队列,但开源版本仅支持 18 个固定延迟级别(阿里云版本支持任意时间精度)。
  • 默认延迟级别:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。
  • 使用方法:通过 setDelayTimeLevel 设置延迟级别(从 1 开始计数)。
message.setDelayTimeLevel(3); // 10s延迟
  • 实现原理

    队列划分:Broker 根据延迟级别创建多个队列,每个队列对应一个级别。

    定时任务:Broker 为每个延迟级别启动独立定时任务,轮询队列中的消息。

    消息转发:当消息达到延迟时间后,Broker 将其转发至目标 Topic 的队列中。

    有序性保障:相同延迟级别的消息存储在同一队列,保证消费顺序。

  • 消息重试原理:消费失败的消息会被投递至延迟队列的 Topic,到期后重新转发至原 Topic 进行重试。

Kafka
  • 实现方式:原生不支持延迟队列,需通过 手动实现 模拟功能,通常采用 固定延迟级别 方案。

  • 实现步骤

    创建延迟 Topic:设置固定数量的 Partition(如 18 个),每个 Partition 对应一个延迟级别。

    发送消息

    根据延迟参数选择目标 Partition。

    将原 Topic 信息存入消息 Header(如 origin_topic)。

    ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>( "delay_topic", delayPartition, delayTime, data);producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));

    消费延迟消息

    专用 Consumer Group 订阅延迟 Topic。

    若消息未达延迟时间,调用 pause() 暂停消费,并通过 seek() 重置 Offset,同时启动定时器轮询。

    达到延迟时间后,调用 resume() 恢复消费,从 Header 中获取原 Topic 并转发消息。

  • 关键设计

    避免 Rebalance:通过 pause()/resume() 控制消费进度,防止因超时未消费触发 Rebalance(默认 max.poll.interval.ms=300s)。

    有序性保障:相同延迟级别的消息存储在同一 Partition,保证消费顺序。