之前学习的都是一条消息发给一个消费者,下面开始记录如何把一条信息发给多个消费者
这边我们用到了交换机Exchange
交换机模式:fanout
模式特点:生产者把消息发送给Exchange之后,Exchange则会把这些消息添加到与自己绑定的所有队列之中,监听这些队列的消费者就可以收到这些消息。
注:Exchange并不能保存信息,如果没有绑定的队列,那么生产者发送数据就会丢失,只有队列才能存储生产者的消息。
生产者:声明交换机后指定交换机模式fanout
package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Producer { // exchange名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { // 获取连接 connection = ConnectionUtil.getConnection(); // 创建通道 channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // 生产者发送的信息 String msg = "msg from producer:"; for(int i=0;i<10;i++) { msg = "msg from producer :" + i; System.out.println("send msg : "+msg); // 发送信息 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } // 关闭连接 try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费者1:声明队列fanout_exchange_to_queue_01并绑定到交换机exchange_fanout
package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer01 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; // 监听队列名称 private static final String QUEUE_NAME = "fanout_exchange_to_queue_01"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); System.out.println("[1]:deal msg successful."); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
消费者2:声明队列fanout_exchange_to_queue_02并绑定到交换机exchange_fanout
package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer02 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; // 监听队列名称 private static final String QUEUE_NAME = "fanout_exchange_to_queue_02"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2]:receive msg:"+msg); System.out.println("[2]:deal msg successful."); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
消费者3:声明队列fanout_exchange_to_queue_03,没有绑定到交换机exchange_fanout
package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer03 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; // 监听队列名称 private static final String QUEUE_NAME = "fanout_exchange_to_queue_03"; public static void main(String[] args) { try { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列到交换机// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[3]:receive msg:"+msg); System.out.println("[3]:deal msg successful."); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
这次,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法
可以看到生产者已经成功发送消息给交换机,交换机也成功创建了,但是消息数据丢失了,原因就是上面说的,交换机无法保存从生产者那接收到的信息,只有队列可以保存消息。
接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。
消费者1终端:
消费者2终端:
消费者3终端: