这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
思路解读(重点理解):
一个生产者,多个消费者每一个消费者都有自己的一个队列生产者没有直接发消息到队列中,而是发送到交换机每个消费者的队列都绑定到交换机上消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
以用户发邮件案例讲解
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
生产者代码如下:
package com.rabbitmqdemo.Producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmqdemo.Utils.MQConnectionUtils; import java.util.concurrent.TimeoutException; import java.io.IOException; public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建一个新的连接 Connection connection = MQConnectionUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); //3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "fanout_exchange_msg"; // 4.发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } }
邮件消费者代码如下:
package com.rabbitmqdemo.consumer; import com.rabbitmq.client.*; import com.rabbitmqdemo.Utils.MQConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutEmailConsumer { private static final String QUEUE_NAME = "ConsumerFanout_Email"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者启动"); // 1.创建新的连接 Connection connection = MQConnectionUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
短信消费者代码如下:
package com.rabbitmqdemo.consumer; import com.rabbitmq.client.*; import com.rabbitmqdemo.Utils.MQConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutSMSConsumer { private static final String QUEUE_NAME = "ConsumerFanout_SMS"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者启动"); // 1.创建新的连接 Connection connection = MQConnectionUtils.newConnection(); // 2.创建通道 Channel channel = connection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
