2022-09-22 20:56:31
RabbitMQ、RocketMQ、Kafka 实现延迟队列的方式及原理如下:
RabbitMQ实现方式:通过 DLX(死信交换机) 和 TTL(消息过期时间) 间接实现延迟队列。
TTL 设置方式:
队列级 TTL:通过参数 x-message-ttl 设置,单位为毫秒,队列中所有消息共享同一过期时间。
Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 6000); // 6秒channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);消息级 TTL:通过 setExpiration 方法单独设置每条消息的过期时间。
AMQP.BasicProperties properties = new AMQP.BasicProperties();properties.setDeliveryMode(2);properties.setExpiration("60000"); // 60秒channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());优先级规则:若同时设置队列级和消息级 TTL,以较小值为准。
DLX 死信交换机:
触发条件:消息被拒绝(reject 且 requeue=false)、消息过期、队列达到最大长度。
配置参数:
x-dead-letter-exchange:指定死信交换机名称。
x-dead-letter-routing-key(可选):指定死信路由键,未设置则使用原队列路由键。
Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);实现原理:消息发送至普通队列后,若未被消费且达到 TTL 时间,则被转发至绑定的 DLX 死信队列。通过不同延迟级别的队列组合,可实现多级延迟效果。

实现原理:
队列划分:Broker 根据延迟级别创建多个队列,每个队列对应一个级别。
定时任务:Broker 为每个延迟级别启动独立定时任务,轮询队列中的消息。
消息转发:当消息达到延迟时间后,Broker 将其转发至目标 Topic 的队列中。
有序性保障:相同延迟级别的消息存储在同一队列,保证消费顺序。
消息重试原理:消费失败的消息会被投递至延迟队列的 Topic,到期后重新转发至原 Topic 进行重试。

实现方式:原生不支持延迟队列,需通过 手动实现 模拟功能,通常采用 固定延迟级别 方案。
实现步骤:
创建延迟 Topic:设置固定数量的 Partition(如 18 个),每个 Partition 对应一个延迟级别。
发送消息:
根据延迟参数选择目标 Partition。
将原 Topic 信息存入消息 Header(如 origin_topic)。
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>( "delay_topic", delayPartition, delayTime, data);producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));消费延迟消息:
专用 Consumer Group 订阅延迟 Topic。
若消息未达延迟时间,调用 pause() 暂停消费,并通过 seek() 重置 Offset,同时启动定时器轮询。
达到延迟时间后,调用 resume() 恢复消费,从 Header 中获取原 Topic 并转发消息。
关键设计:
避免 Rebalance:通过 pause()/resume() 控制消费进度,防止因超时未消费触发 Rebalance(默认 max.poll.interval.ms=300s)。
有序性保障:相同延迟级别的消息存储在同一 Partition,保证消费顺序。
