Celery: 实现集群消费和广播消费

tech2023-12-28  87

集群消费

集群消费类似于负载均衡,生产产生的消息只会被一个worker消费一次

代码
worker from kombu import Queue from kombu import Exchange from kombu.common import Broadcast import logging from celery import Celery # logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s' ) logger = logging.getLogger(__name__) # mq config app = Celery("node_worker") app.conf.task_queues = [ Queue(f'write-mysql', routing_key='write-mysql') ] app.conf.task_routes = { "write-mysql": { 'queue': 'write-mysql', 'routing_key': 'write-mysql' } } # rabbit mq的信息 app.conf.broker_url = 'amqp://user:password@127.0.0.1:5672/celeryhost' # task @app.task(name=f"write-mysql") def write_sql_task(message): logger.error(message) 启动worker # 启动worker 1 celery -A workerOne worker -l info -S one/ -n one@%h # 启动worker 2 celery -A workerOne worker -l info -S two/ -n two@%h 进行测试 # 调用代码 from workerOne import write_sql_task write_sql_task.apply_async(args=["is one"]) # 因为使用的是集群消费, 所以你会看到第一次是第一个worker消费, 第二次是第二个worker消费

广播消费

广播消费就是群发的概念,消息被消费的次数取决于有多少个worker

worker from kombu import Queue from kombu.common import Broadcast import logging from celery import Celery # logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s' ) logger = logging.getLogger(__name__) # mq config app = Celery("node_worker") app.conf.task_queues = [ Broadcast(f'write-mysql-broadcast') ] app.conf.task_routes = { "write-mysql-broadcast": { 'queue': 'write-mysql-broadcast' } } app.conf.broker_url = 'amqp://user:password@127.0.0.1:5672/celeryhost' # task @app.task(name=f"write-mysql") def write_sql_task(message): logger.error(message) 启动worker # 启动worker 1 celery -A workerOne worker -l info -S one/ -n one@%h # 启动worker 2 celery -A workerOne worker -l info -S two/ -n two@%h 进行测试 # 调用代码 from workerOne import write_sql_task write_sql_task.apply_async(args=["is one"], queue='write-mysql-broadcast') # 因为是广播消费,所以每次发送消息,都会被消费两次
最新回复(0)