如何在PHP中使用RabbitMQ

tech2022-09-23  124

AMQP (Advanced Message Queueing Protocol) is a network protocol that can deliver messages from one application endpoint to another application endpoint. It does not care about the platform or language of said applications, as long as they support AMQP.

AMQP ( 高级消息队列协议 )是一种网络协议,可以将消息从一个应用程序端点传递到另一个应用程序端点。 只要它们支持AMQP,它就不会在乎这些应用程序的平台或语言。

Essentially, one of the application endpoints sends a message, thus being a Producer, through an AMQP broker. In turn the broker will deliver the message to another application endpoint, called the Consumer.

本质上,应用程序端点之一通过AMQP代理发送消息,从而成为生产者。 反过来,代理会将消息传递到另一个应用程序终结点,称为“消费者”。

RabbitMQ is an AMQP broker that has support for several programming languages, such as PHP.

RabbitMQ是一个AMQP代理,它支持多种编程语言,例如PHP。

The advantage of having a message broker such as RabbitMQ, and AMQP being a network protocol, is that the producer, the broker, and the consumer can live on different physical/virtual servers on different geographic locations.

使用消息代理(例如RabbitMQ和AMQP作为网络协议)的好处是,生产者,代理和消费者可以生活在不同地理位置的不同物理/虚拟服务器上。

Also, since networks are unreliable, and applications might fail to process a message completely, AMQP supports message acknowledgements, either automatically or when an application endpoint decides to send them.

另外,由于网络不可靠,并且应用程序可能无法完全处理消息,因此AMQP自动或在应用程序端点决定发送消息时支持消息确认。

RabbitMQ implements the AMQP 0-9-1 protocol. At a glance, it follows the following model.

RabbitMQ实现AMQP 0-9-1协议。 乍一看,它遵循以下模型。

词汇表 (Glossary)

ConceptDefinitionIconProducerApplication endpoint that sends messages ConsumerApplication endpoint that receives messagesConnectionHandles protocol, errors, authentication, etc… The connection is done using TCP protocol.–ChannelConnections are multiplexed through channels. Even though all channels share the same tcp connection, communication from one channel is completely independent of another.–ExchangeReceives messages from producers and pushes them to queues. Depending on the situation, this can be transparent to the developer. QueueBuffer that stores messagesMessagePiece of arbitrary information conforming to the AMQP format that is sent from the producer to a consumer through the broker. The broker cannot modify the information inside the message.–AcknowledgementsNotice sent back from the consumer to tell the server that a message has been received and processed, so the server can delete it from the queue.– 概念 定义 图标 制片人 发送消息的应用程序端点 消费者 接收消息的应用程序端点 连接 处理协议,错误,身份验证等…连接是使用TCP协议完成的。 – 渠道 连接通过通道多路复用。 即使所有通道共享相同的tcp连接,来自一个通道的通信也完全独立于另一个通道。 – 交换 接收来自生产者的消息并将其推入队列。 根据情况,这对开发人员可能是透明的。 队列 存储消息的缓冲区 信息 一条符合AMQP格式的任意信息,该信息从生产者通过经纪人发送给消费者。 代理无法修改消息内的信息。 – 致谢 使用者发送回的通知,告知服务器已接收并处理了一条消息,因此服务器可以将其从队列中删除。 –

Another advantage of AMQP 0-9-1 is that the application defines the routing logic instead of a broker administrator. This gives the developer a lot of flexibility, without the need to learn a new programming/scripting/markup language.

AMQP 0-9-1的另一个优点是应用程序定义了路由逻辑,而不是代理管理员。 这为开发人员提供了很大的灵活性,而无需学习新的编程/脚本/标记语言。

You can learn more about AMQP and RabbitMQ at the “AMQP 0-9-1 Model Explained” guide. Although not necessary for this tutorial, I encourage you to read it completely.

您可以在“ AMQP 0-9-1 Model Explained ”指南中了解有关AMQP和RabbitMQ的更多信息。 尽管本教程不是必需的,但我建议您完整阅读它。

安装RabbitMQ (Installing RabbitMQ)

You can find detailed documentation for your platform here, but roughly for Ubuntu it goes as follows:

您可以在此处找到有关您的平台的详细文档,但是对于Ubuntu大致如下:

sudo vim /etc/apt/sources.list

Add the following line to the file: deb https://www.rabbitmq.com/debian/ testing main

deb https://www.rabbitmq.com/debian/ testing main下行添加到文件中: deb https://www.rabbitmq.com/debian/ testing main

After this, just run the following commands

之后,只需运行以下命令

wget https://www.rabbitmq.com/rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmqctl status

You should see something like the following:

您应该看到类似以下的内容:

sudo rabbitmqctl status Status of node 'rabbit@localhost-sprabbitmq-926807' ... [{pid,790}, {running_applications,[{rabbit,"RabbitMQ","3.3.4"}, {os_mon,"CPO CXC 138 46","2.2.14"}, {mnesia,"MNESIA CXC 138 12","4.11"}, {xmerl,"XML parser","1.3.5"}, {sasl,"SASL CXC 138 11","2.3.4"}, {stdlib,"ERTS CXC 138 10","1.19.4"}, {kernel,"ERTS CXC 138 10","2.16.4"}]}, {os,{unix,linux}}, {erlang_version,"Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]\n"}, {memory,[{total,53400096}, {connection_procs,2704}, {queue_procs,34432}, {plugins,0}, {other_proc,13983664}, {mnesia,64504}, {mgmt_db,0}, {msg_index,26056}, {other_ets,770880}, {binary,16112}, {code,16372077}, {atom,594537}, {other_system,21535130}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,12677506662}, {disk_free_limit,50000000}, {disk_free,899366912}, {file_descriptors,[{total_limit,199900}, {total_used,5}, {sockets_limit,179908}, {sockets_used,1}]}, {processes,[{limit,1048576},{used,133}]}, {run_queue,0}, {uptime,34}] ...done.

If you get an error message, the server probably needs to be started with the following command

如果收到错误消息,则可能需要使用以下命令启动服务器

sudo invoke-rc.d rabbitmq-server start * Starting message broker rabbitmq-server ...done.

RabbitMQ defines a default user:

RabbitMQ定义了一个默认用户:

Username: guest

用户名: guest

Password: guest

密码: 访客

Be aware that this user will only be able to be used if connecting to RabbitMQ from localhost. For a true distributed system, you will have to define users and roles. You can read more on Access Control in the documentation. For the following examples we will be using the above credentials.

请注意,只有从本地主机连接到RabbitMQ时,才能使用此用户。 对于真正的分布式系统,您将必须定义用户和角色。 您可以在文档中阅读有关访问控制的更多信息 。 对于以下示例,我们将使用以上凭据。

In order to be able to integrate your php application with RabbitMQ, you’ll need the php-amqplib library. Getting it is easy with composer, just define the requirement inside your composer.json file:

为了能够将您的php应用程序与RabbitMQ集成,您将需要php-amqplib库。 使用composer可以轻松实现,只需在composer.json文件中定义需求:

{ ... "require": { ..., "videlalvaro/php-amqplib": "2.2.*" } ... }

After a composer update execution you’ll be all set.

在执行composer update后,您将一切准备就绪。

免责声明 (Disclaimer)

By no means should this code ever be used in production ready applications nor be executed on production servers. No security checks and/or validations were enforced. This code was written for educational purposes only, having the scope to showcase basic functionality only. Performance, efficiency, or reusability were not a priority.

绝对不要在生产准备就绪的应用程序中使用此代码,也不要在生产服务器上执行该代码。 没有执行安全检查和/或验证。 该代码仅用于教育目的,具有仅展示基本功能的范围。 性能,效率或可重用性不是优先事项。

Please note that even though the following examples uses the same host for the producer, broker, and consumer applications to ease development, deployment, and testing, in real life it would make no sense to have a “distributed system” in the same box.

请注意,即使以下示例在生产者,代理和消费者应用程序中使用相同的主机来简化开发,部署和测试,但在现实生活中,将“分布式系统”放在同一框中是没有意义的。

For full source code listing, you can check this GitHub repository, containing the application used in the following examples.

有关完整的源代码列表,您可以检查此GitHub存储库 ,其中包含以下示例中使用的应用程序。

简单示例:发送请求以异步处理数据 (Simple example: send request to process data asynchronously)

Suppose we have a pizza company, and we receive online orders. Let’s also suppose we have an automated system that processes orders, but this system cannot be exposed directly to the public…

假设我们有一家比萨公司,并且我们收到在线订单。 我们还假设我们有一个处理订单的自动化系统,但是该系统不能直接向公众公开...

We will implement the simplest of patterns:

我们将实现最简单的模式:

First of all, we have the following script to accept requests from the form:

首先,我们有以下脚本可以接受来自表单的请求:

<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleSender; $theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING); $simpleSender = new SimpleSender(); $simpleSender->execute($theName); header("Location: orderReceived.html");

This code will simply check for a POST parameter named theName and send it to an object we created for processing it. Let’s take a look at the SimpleSender::execute() method:

此代码将仅检查名为theName的POST参数,并将其发送到我们创建的用于处理它的对象。 让我们看一下SimpleSender :: execute()方法:

<?php // ... SOME CODE HERE ... /** * Sends a message to the pizzaTime queue. * * @param string $message */ public function execute($message) { /** * Create a connection to RabbitAMQP */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /** @var $channel AMQPChannel */ $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); $msg = new AMQPMessage($message); $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); $channel->close(); $connection->close(); }

A line-by-line breakdown is as follows:

逐行细分如下:

<?php /* ... MORE CODE HERE ... */ $connection = new AMQPConnection( 'localhost', #host - host name where the RabbitMQ server is runing 5672, #port - port number of the service, 5672 is the default 'guest', #user - username to connect to server 'guest' #password ); /* ... MORE CODE HERE ... */

First, we create a connection object. Please be aware that the credentials guest:guest are the default for RabbitMQ. However, you will only be allowed to connect to the server using those if you connect from within the same host (localhost).

首先,我们创建一个连接对象。 请注意,凭据guest:guest是RabbitMQ的默认设置。 但是,仅当您从同一主机(本地主机)进行连接时,才允许使用这些服务器连接服务器。

Since RabbitMQ listens and serves using a single port, we need to create a channel (think of it as a virtual port) with $channel = $connection->channel(); so other clients are able to connect to the server.

由于RabbitMQ使用单个端口监听和服务,因此我们需要使用$channel = $connection->channel();创建一个通道(将其视为虚拟端口$channel = $connection->channel(); 这样其他客户端就可以连接到服务器。

<?php /* ... MORE CODE HERE ... */ $channel->queue_declare( 'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters false, #passive - can use this to check whether an exchange exists without modifying the server state false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart false, #exclusive - used by only one connection and the queue will be deleted when that connection closes false #autodelete - queue is deleted when last consumer unsubscribes ); /* ... MORE CODE HERE ... */

Once we have our channel ready, let’s declare a queue to send the request to. The good thing about RabbitMQ is that we can create queues directly from the client, but we have to be careful how we create it. Let’s explain briefly the parameters used to create a queue with $channel->queue_declare()

通道准备就绪后,让我们声明一个队列来发送请求。 RabbitMQ的好处是我们可以直接从客户端创建队列,但是我们必须小心如何创建队列。 让我们简要地解释用于通过$channel->queue_declare()创建队列的参数。

Queue Name: this is an arbitrary name, will be used to identify the queue

队列名称 :这是一个任意名称,将用于标识队列

Passive: if set to true, the server will only check if the queue can be created, false will actually attempt to create the queue.

被动 :如果设置为true,则服务器将仅检查是否可以创建队列,而false将实际尝试创建队列。

Durable: Typically, if the server stops or crashes, all queues and messages are lost… unless we declare the queue durable, in which case the queue will persist if the server is restarted.

持久 :通常,如果服务器停止或崩溃,则所有队列和消息都将丢失……除非我们声明该队列为持久性,否则在这种情况下,如果重新启动服务器,该队列将继续存在。

Exclusive: If true, the queue can only be used by the connection that created it.

Exclusive :如果为true,则队列只能由创建它的连接使用。

Autodelete: if true, the queue will be deleted once it has no messages and there are no subscribers connected

自动删除 :如果为true,则队列将在没有消息且没有连接订户的情况下被删除

In our example the queue will not persist if the server is restarted, can be used by other connections and will not be deleted if there are no more subscribers to it.

在我们的示例中,如果重新启动服务器,队列将不会持久,可以被其他连接使用,并且如果没有更多的订阅者,队列也不会被删除。

Next, we created a message object with $msg = new AMQPMessage($message);, $message being the POST parameter, theName that we received from the form. A message can be any string.

接下来,我们使用$msg = new AMQPMessage($message);创建一个消息对象$msg = new AMQPMessage($message); , $message是POST参数,即我们从表单收到的theName 。 消息可以是任何字符串。

<?php /* ... MORE CODE HERE ... */ $channel->basic_publish( $msg, #message '', #exchange 'pizzaTime' #routing key ); /* ... MORE CODE HERE ... */

Now we have to publish the message to the queue. However, we cannot publish messages directly to the queue if it is not through an exchange. We never declared an exchange, so how will this be possible? It turns out that when we create a queue without defining an exchange to bind the queue to, a default exchange will be used. We can publish the message to the queue through the default exchange with $channel->basic_publish(), the parameters it uses are:

现在我们必须将消息发布到队列中。 但是,如果不是通过交换,则无法将消息直接发布到队列。 我们从未宣布过交换,那怎么可能呢? 事实证明,当我们创建队列时未定义将队列绑定到的交换时,将使用默认交换。 我们可以使用$channel->basic_publish()通过默认的交换将消息发布到队列中,它使用的参数是:

Message: the message we want to send

消息:我们要发送的消息 Exchange: notice that we are using an empty string, because we will use the default exchange

交换:请注意,我们使用的是空字符串,因为我们将使用默认交换 Routing key: the queue name we want the message to be delivered to.

路由密钥:我们要将邮件传递到的队列名称。 <?php /* ... MORE CODE HERE ... */ $channel->close(); $connection->close(); /* ... MORE CODE HERE ... */

After we are done, we have to close the connection to the channel and the server.

完成后,我们必须关闭与通道和服务器的连接。

Please notice that we did not receive any response back from the server. At best we can be sure that the message was queued but we totally ignore if the message reached the end recipient. This is part of the beauty of AMQP… we can quickly dispatch customers on our public site and asynchronously process the orders on a back-office application.

请注意,我们没有收到服务器的任何回复。 充其量我们可以确定邮件已排队,但是我们完全忽略了邮件是否到达最终收件人。 这是AMQP之美的一部分……我们可以在公共站点上快速调度客户,并在后台应用程序上异步处理订单。

So the pizza order is in the queue, how do we retrieve those? First of all we have to be aware that a consumer has to establish a constant connection to the queue server (a.k.a. subscribe) in order for it to receive messages from the server. The server will not push those messages by itself to our application. Fortunately, creating that connection is super easy.

因此,比萨饼订单在排队中,我们如何检索这些比萨饼? 首先,我们必须意识到,使用者必须建立与队列服务器的恒定连接(也称为订阅),才能使其从服务器接收消息。 服务器不会自行将这些消息推送到我们的应用程序。 幸运的是,创建该连接非常容易。

<?php namespace Acme\AmqpWrapper; use PhpAmqpLib\Connection\AMQPConnection; class SimpleReceiver { /* ... SOME CODE HERE ... */ /** * Listens for incoming messages */ public function listen() { $connection = new AMQPConnection( 'localhost', #host 5672, #port 'guest', #user 'guest' #password ); $channel = $connection->channel(); $channel->queue_declare( 'pizzaTime', #queue name, the same as the sender false, #passive false, #durable false, #exclusive false #autodelete ); $channel->basic_consume( 'pizzaTime', #queue '', #consumer tag - Identifier for the consumer, valid within the current channel. just string false, #no local - TRUE: the server will not send messages to the connection that published them true, #no ack - send a proper acknowledgment from the worker, once we're done with a task false, #exclusive - queues may only be accessed by the current connection false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method array($this, 'processOrder') #callback - method that will receive the message ); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } /** * @param $msg */ public function processOrder($msg) { /* ... CODE TO PROCESS ORDER HERE ... */ } }

Just as we connected, created a channel and declared a queue in the producer, we have to do exactly the same inside the consumer. However in the consumer, we have to subscribe to the channel with $channel->basic_consume(), and the parameters used are defined as follows:

正如我们在生产者中建立连接,创建通道并声明队列一样,我们必须在消费者中执行完全相同的操作。 但是,在使用者中,我们必须使用$channel->basic_consume()订阅频道,并且使用的参数定义如下:

Queue: has to be the same queue name we defined in the producer

队列:必须与生产者中定义的队列名称相同 Consumer tag: an arbitrary name given to the consumer. If this field is empty the server will generate a unique tag

消费者标签:给消费者的任意名称。 如果此字段为空,服务器将生成一个唯一标签 No local: This is an obscure parameter, if activated, the server will not deliver its own messages

无本地:这是一个晦涩的参数,如果激活,服务器将不会传递自己的消息 No Ack(nowledgement): will automatically acknowledge that the consumer received the message, so we do not have to manually do so.

No Ack(nowledgement):将自动确认消费者已收到该消息,因此我们不必手动这样做。 No wait: If set, the server will not wait for the process in the consumer to complete

不等待:如果设置,则服务器将不等待使用方中的过程完成

Callback: can be a function name, an array containing the object and the method name, or a closure that will receive the queued message. This callback has to accept a parameter, containing such a message. In our example, array($this, 'processOrder') is used to define the processOrder() method of the current object as the callback.

回调:可以是函数名称,包含对象和方法名称的数组或将接收排队消息的闭包。 此回调必须接受包含此类消息的参数。 在我们的示例中, array($this, 'processOrder')用于将当前对象的processOrder()方法定义为回调。

<?php /* ... SOME CODE HERE ... */ while(count($channel->callbacks)) { $channel->wait(); } /* ... SOME CODE HERE ... */

The ‘magic’ happens inside the while loop. If the subscriber has at least one defined callback, we will wait for any event in the channel. Every time a messaged is received, our defined callback processOrder() will be executed, so we can process the message as we need.

“魔术”发生在while循环内。 如果订户具有至少一个已定义的回调,则我们将等待通道中的任何事件。 每次收到消息时,都会执行我们定义的回调processOrder() ,因此我们可以根据需要处理消息。

How do we fire this up? Simple create a script that will invoke the SimpleReceiver::listen() method like:

我们如何点火呢? 简单创建一个脚本,该脚本将调用SimpleReceiver::listen()方法,例如:

<?php chdir(dirname(__DIR__)); require_once('vendor/autoload.php'); use Acme\AmqpWrapper\SimpleReceiver; $receiver = new SimpleReceiver(); $receiver->listen();

Now launch the process in the console with php <script name> and let it do its job. If you want to kill the consumer, a simple Ctrl + C will interrupt the process.

现在,使用php <script name>在控制台中启动该过程,并使其完成工作。 如果要杀死使用者,则简单的Ctrl + C会中断该过程。

One of the nice things of having a queue server is that if for whichever reason your consumer job crashes, it will not disrupt the service to the users of your public application. The messages will simply stack inside the queue and once you relaunch the consumer, the messages will be delivered one by one to it for processing.

拥有队列服务器的好处之一就是,无论出于何种原因您的使用者作业崩溃,它都不会中断对公共应用程序用户的服务。 消息将简单地堆积在队列中,并且一旦您重新启动使用者,消息将被逐一传递给它进行处理。

结论 (Conclusion)

In this part, we introduced the theory of AMQP and queueing systems and demonstrated their use on a simple example. In the followup to this post, we’ll deal with two more examples of increased complexity and advanced concepts. Stay tuned!

在这一部分中,我们介绍了AMQP和排队系统的理论,并在一个简单的示例中演示了它们的使用。 在本文的后续中,我们将再处理两个示例,这些示例具有更高的复杂性和高级概念。 敬请关注!

翻译自: https://www.sitepoint.com/use-rabbitmq-php/

相关资源:php 使用rabbitmq
最新回复(0)