Java - 发布/订阅模式
发布/订阅模式(Publish/Subscribe)引入pom生产者消费者·接收短信消费者·接收邮件补充:结合工作队列模式
发布/订阅模式(Publish/Subscribe)
一个消息 , 多个接收
特点:
一个生产者将消息发送给交换机与交换机绑定的有多个队列,每个消费者监听自己的队列生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。如果消息发送给没有绑定队列的交换机上,消息将丢失。他比工作队列模式(work queue)更为强大,可以看成多个工作队列。
引入pom
<dependency>
<groupId>com.rabbitmq
</groupId>
<artifactId>amqp-client
</artifactId>
</dependency>
生产者
public class Producer {
private final static String QUEUE_EMAIL
= "queue_email";
private final static String QUEUE_SMS
= "queue_sms";
private final static String EXCHANGE_NAME
= "exchange_fanout_1";
public static void main(String
[] args
) throws IOException
, TimeoutException
{
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
Connection connection
= null
;
Channel channel
= null
;
try{
connection
= factory
.newConnection();
channel
= connection
.createChannel();
channel
.exchangeDeclare(EXCHANGE_NAME
, BuiltinExchangeType
.FANOUT
);
channel
.queueDeclare(QUEUE_EMAIL
,false,false,false, null
);
channel
.queueDeclare(QUEUE_SMS
,false,false,false,null
);
channel
.queueBind(QUEUE_EMAIL
,EXCHANGE_NAME
,"");
channel
.queueBind(QUEUE_SMS
,EXCHANGE_NAME
,"");
String message
= "接好了,SMS和EMAIL~~";
channel
.basicPublish(EXCHANGE_NAME
,"",null
,message
.getBytes());
System
.out
.println("send message: "+message
);
} catch (TimeoutException e
) {
e
.printStackTrace();
} catch (IOException e
) {
e
.printStackTrace();
} finally {
channel
.close();
connection
.close();
}
}
}
消费者·接收短信
public class Consumer_SMS {
private final static String QUEUE_EMAIL
= "queue_email";
private final static String QUEUE_SMS
= "queue_sms";
private final static String EXCHANGE_NAME
= "exchange_fanout_1";
public static void main(String
[] args
) {
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
Connection connection
= null
;
Channel channel
= null
;
try{
connection
= factory
.newConnection();
channel
= connection
.createChannel();
channel
.exchangeDeclare(EXCHANGE_NAME
, BuiltinExchangeType
.FANOUT
);
channel
.queueDeclare(QUEUE_SMS
,false,false,false,null
);
channel
.queueBind(QUEUE_SMS
,EXCHANGE_NAME
,"");
DefaultConsumer consumer
= new DefaultConsumer(channel
){
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
, AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String message
= new String (body
, StandardCharsets
.UTF_8
);
System
.out
.println("SMS receive:" + message
);
}
};
channel
.basicConsume(QUEUE_SMS
,true,consumer
);
} catch (TimeoutException | IOException e
) {
e
.printStackTrace();
}
}
}
消费者·接收邮件
public class Consumer_EMAIL {
private final static String QUEUE_EMAIL
= "queue_email";
private final static String QUEUE_SMS
= "queue_sms";
private final static String EXCHANGE_NAME
= "exchange_fanout_1";
public static void main(String
[] args
) {
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
Connection connection
= null
;
Channel channel
= null
;
try{
connection
= factory
.newConnection();
channel
= connection
.createChannel();
channel
.exchangeDeclare(EXCHANGE_NAME
, BuiltinExchangeType
.FANOUT
);
channel
.queueDeclare(QUEUE_EMAIL
,false,false,false, null
);
channel
.queueBind(QUEUE_EMAIL
,EXCHANGE_NAME
,"");
DefaultConsumer consumer
= new DefaultConsumer(channel
){
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
, AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String message
= new String (body
, StandardCharsets
.UTF_8
);
System
.out
.println("Email receive:" + message
);
}
};
channel
.basicConsume(QUEUE_EMAIL
,true,consumer
);
} catch (TimeoutException | IOException e
) {
e
.printStackTrace();
}
}
}
补充:结合工作队列模式
其实只要在 消费者【c1】并列加上一个【c3】即可,c1、c3共享同一队列
public class Consumer_SMS_2 {
private final static String QUEUE_EMAIL
= "queue_email";
private final static String QUEUE_SMS
= "queue_sms";
private final static String EXCHANGE_NAME
= "exchange_fanout_1";
public static void main(String
[] args
) {
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
Connection connection
= null
;
Channel channel
= null
;
try{
connection
= factory
.newConnection();
channel
= connection
.createChannel();
channel
.exchangeDeclare(EXCHANGE_NAME
, BuiltinExchangeType
.FANOUT
);
channel
.queueDeclare(QUEUE_SMS
,false,false,false,null
);
channel
.queueBind(QUEUE_SMS
,EXCHANGE_NAME
,"");
DefaultConsumer consumer
= new DefaultConsumer(channel
){
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
, AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String message
= new String (body
, StandardCharsets
.UTF_8
);
System
.out
.println("SMS_2 receive:" + message
);
}
};
channel
.basicConsume(QUEUE_SMS
,true,consumer
);
} catch (TimeoutException | IOException e
) {
e
.printStackTrace();
}
}
}