如何在 RabbitMQ 中建立延遲佇列

首先,我們需要設定兩個基本通道,一個用於主佇列,另一個用於延遲佇列。在最後的示例中,我新增了一些不需要的附加標誌,但使程式碼更可靠; 如 confirm deliverydelivery_modedurable。你可以在 RabbitMQ 手冊中找到有關這些的更多資訊。

在我們設定了通道後,我們新增了一個繫結到主通道,我們可以使用它來將訊息從延遲通道傳送到主佇列。

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

接下來,我們需要配置延遲通道,以便在訊息到期後將訊息轉發到主佇列。

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (訊息 - 生存時間)

    這通常用於在特定持續時間後自動刪除佇列中的舊訊息,但通過新增兩個可選引數,我們可以更改此行為,而是使用此引數以毫秒為單位確定訊息在延遲佇列中保留多長時間。

  • X-死信路由鍵

    此變數允許我們在訊息過期後將訊息傳輸到其他佇列,而不是完全刪除它的預設行為。

  • X-死信交換

    此變數確定用於將訊息從 hello_delay 傳輸到 hello 佇列的 Exchange。

釋出到延遲佇列

當我們完成設定所有基本 Pika 引數後,你只需使用基本釋出將訊息傳送到延遲佇列。

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

執行完指令碼後,你應該會在 RabbitMQ 管理模組中看到以下佇列。 在此處輸入影象描述

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")