如何在 RabbitMQ 中创建延迟队列

首先,我们需要设置两个基本通道,一个用于主队列,另一个用于延迟队列。在最后的示例中,我添加了一些不需要的附加标志,但使代码更可靠; 如 confirm deliverydelivery_modedurable。你可以在 RabbitMQ 手册中找到有关这些的更多信息。

在我们设置了通道后,我们添加了一个绑定到主通道,我们可以使用它来将消息从延迟通道发送到主队列。

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

接下来,我们需要配置延迟通道,以便在消息到期后将消息转发到主队列。

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (消息 - 生存时间)

    这通常用于在特定持续时间后自动删除队列中的旧消息,但通过添加两个可选参数,我们可以更改此行为,而是使用此参数以毫秒为单位确定消息在延迟队列中保留多长时间。

  • X-死信路由键

    此变量允许我们在消息过期后将消息传输到其他队列,而不是完全删除它的默认行为。

  • X-死信交换

    此变量确定用于将消息从 hello_delay 传输到 hello 队列的 Exchange。

发布到延迟队列

当我们完成设置所有基本 Pika 参数后,你只需使用基本发布将消息发送到延迟队列。

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

执行完脚本后,你应该会在 RabbitMQ 管理模块中看到以下队列。 在此处输入图像描述

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")