如何使用 RabbitMQ 中的消息

首先导入库。

from amqpstorm import Connection

在使用消息时,我们首先需要定义一个函数来处理传入的消息。这可以是任何可调用的函数,并且必须采用消息对象或消息元组(取决于 start_consuming 中定义的 to_tuple 参数)。

除了处理来自传入消息的数据外,我们还必须确认或拒绝该消息。这很重要,因为我们需要让 RabbitMQ 知道我们正确接收并处理了消息。

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

接下来,我们需要建立与 RabbitMQ 服务器的连接。

connection = Connection('127.0.0.1', 'guest', 'guest')

之后我们需要建立一个频道。每个连接可以有多个通道,通常在执行多线程任务时,建议(但不要求)每个线程有一个。

channel = connection.channel()

设置好频道后,我们需要让 RabbitMQ 知道我们要开始使用消息。在这种情况下,我们将使用我们之前定义的 on_message 函数来处理所有消耗的消息。

我们将在 RabbitMQ 服务器上监听的队列将是 simple_queue,我们也告诉 RabbitMQ,一旦完成它们,我们将确认所有传入的消息。

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

最后,我们需要启动 IO 循环来开始处理 RabbitMQ 服务器提供的消息。

channel.start_consuming(to_tuple=False)