RabbitMQ系列二
文章目录
工作队列模式(work queue)引入pom生产者消费者1消费者2
工作队列模式(work queue)
一个消息轮流被接收
特点:
一个生产者发送消息多个消费者共同监听一个队列的消息消息不能被重复消费采用轮询的方式将消息平均发送给消费者(c1,c2,c1,c2…交替)
引入pom
<dependency>
<groupId>com.rabbitmq
</groupId>
<artifactId>amqp-client
</artifactId>
</dependency>
生产者
public class Send {
private final static String QUEUE_NAME
= "FIRST-MQ";
public static void main(String
[] args
) throws IOException
, TimeoutException
{
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
Connection connection
= null
;
Channel channel
= null
;
try{
connection
= factory
.newConnection();
channel
= connection
.createChannel();
channel
.queueDeclare(QUEUE_NAME
,false,false,false, null
);
String message
= "work queue 模式,消息来了。";
channel
.basicPublish("",QUEUE_NAME
,null
,message
.getBytes());
System
.out
.println("send message:"+message
);
} catch (TimeoutException | IOException e
) {
e
.printStackTrace();
} finally {
channel
.close();
connection
.close();
}
}
}
消费者1
public class Recv {
private final static String QUEUE_NAME
= "FIRST-MQ";
public static void main(String
[] args
) {
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
try{
Connection connection
= factory
.newConnection();
Channel channel
= connection
.createChannel();
channel
.queueDeclare(QUEUE_NAME
, false, false, false, null
);
System
.out
.println(" [*] Waiting for messages.");
Consumer consumer
= new DefaultConsumer(channel
) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
,
AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String message
= new String(body
, StandardCharsets
.UTF_8
);
System
.out
.println("消费者1 receive:" + message
);
TimeUnit
.SECONDS
.sleep(1);
}
};
channel
.basicConsume(QUEUE_NAME
, true, consumer
);
} catch (IOException | TimeoutException
| InterruptedException e
) {
e
.printStackTrace();
}
}
}
消费者2
public class Recv2 {
private final static String QUEUE_NAME
= "FIRST-MQ";
public static void main(String
[] args
) {
ConnectionFactory factory
= new ConnectionFactory();
factory
.setHost("106.**.**.82");
factory
.setPort(5672);
factory
.setVirtualHost("/");
factory
.setUsername("guest");
factory
.setPassword("guest");
try{
Connection connection
= factory
.newConnection();
Channel channel
= connection
.createChannel();
channel
.queueDeclare(QUEUE_NAME
, false, false, false, null
);
System
.out
.println(" [*] Waiting for messages.");
Consumer consumer
= new DefaultConsumer(channel
) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag
, Envelope envelope
,
AMQP
.BasicProperties properties
, byte[] body
) throws IOException
{
String message
= new String(body
, StandardCharsets
.UTF_8
);
System
.out
.println("消费者2 receive:" + message
);
TimeUnit
.SECONDS
.sleep(1);
}
};
channel
.basicConsume(QUEUE_NAME
, true, consumer
);
} catch (IOException | TimeoutException
| InterruptedException e
) {
e
.printStackTrace();
}
}
}
先启动两个消费者,然后再多次启动生产者,就能看到效果啦。