博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq学习记录(五)交换机Exchange-fanout
阅读量:6935 次
发布时间:2019-06-27

本文共 5784 字,大约阅读时间需要 19 分钟。

hot3.png

之前学习的都是一条消息发给一个消费者,下面开始记录如何把一条信息发给多个消费者

这边我们用到了交换机Exchange

交换机模式:fanout

模式特点:生产者把消息发送给Exchange之后,Exchange则会把这些消息添加到与自己绑定的所有队列之中,监听这些队列的消费者就可以收到这些消息。

注:Exchange并不能保存信息,如果没有绑定的队列,那么生产者发送数据就会丢失,只有队列才能存储生产者的消息。

cfd5c66bf6bb116f48bb94dd345bde24445.jpg

生产者:声明交换机后指定交换机模式fanout

package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Producer {	// exchange名称	private static final String EXCHANGE_NAME = "exchange_fanout"; 		public static void main(String[] args) {		Connection connection = null;		Channel channel = null;		try {			// 获取连接			connection = ConnectionUtil.getConnection();			// 创建通道			channel = connection.createChannel();			// 声明交换机			channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);			// 生产者发送的信息			String msg = "msg from producer:";			for(int i=0;i<10;i++) {				msg = "msg from producer :" + i;				System.out.println("send msg : "+msg);				// 发送信息				channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());			}		} catch (Exception e) {			e.printStackTrace();		} finally {			// 关闭通道			try {				channel.close();			} catch (IOException e) {				e.printStackTrace();			} catch (TimeoutException e) {				e.printStackTrace();			}			// 关闭连接			try {				connection.close();			} catch (IOException e) {				e.printStackTrace();			}		}			}	}

 

消费者1:声明队列fanout_exchange_to_queue_01并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer01 {	// 交换机名称	private static final String EXCHANGE_NAME = "exchange_fanout"; 		// 监听队列名称	private static final String QUEUE_NAME = "fanout_exchange_to_queue_01";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			// 绑定队列到交换机			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[1]:receive msg:"+msg);					System.out.println("[1]:deal msg successful.");				}							};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);								} catch (Exception e) {			e.printStackTrace();		}	}	}

消费者2:声明队列fanout_exchange_to_queue_02并绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer02 {	// 交换机名称	private static final String EXCHANGE_NAME = "exchange_fanout"; 		// 监听队列名称	private static final String QUEUE_NAME = "fanout_exchange_to_queue_02";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			// 绑定队列到交换机			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[2]:receive msg:"+msg);					System.out.println("[2]:deal msg successful.");				}			};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);		} catch (Exception e) {			e.printStackTrace();		}	}	}

消费者3:声明队列fanout_exchange_to_queue_03,没有绑定到交换机exchange_fanout

package com.example.demo.queue.exchangeToQueue.fanout;import java.io.IOException;import com.example.demo.utils.ConnectionUtil;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer03 {	// 交换机名称	private static final String EXCHANGE_NAME = "exchange_fanout"; 		// 监听队列名称	private static final String QUEUE_NAME = "fanout_exchange_to_queue_03";		public static void main(String[] args) {		try {			// 获取连接			Connection connection = ConnectionUtil.getConnection();			// 创建通道			final Channel channel = connection.createChannel();			// 声明队列			channel.queueDeclare(QUEUE_NAME, true, false, false, null);			// 绑定队列到交换机//			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");			// 定义消费者			DefaultConsumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {					String msg = new String(body,"UTF-8");					System.out.println("[3]:receive msg:"+msg);					System.out.println("[3]:deal msg successful.");				}			};			// 接收信息			channel.basicConsume(QUEUE_NAME, true, consumer);		} catch (Exception e) {			e.printStackTrace();		}	}	}

这次,涉及到了交换机,而在消费者类中中并没有声明交换机,所以需要先执行生产者类的main方法

7b062a42d870b6a6278afc1d7c6651e96bf.jpg

99bad760bcfc38d894eff578c19b2453641.jpg

87128f83c8e1ee2580452511fb074cbcf6b.jpg

可以看到生产者已经成功发送消息给交换机,交换机也成功创建了,但是消息数据丢失了,原因就是上面说的,交换机无法保存从生产者那接收到的信息,只有队列可以保存消息。

接下来,我们就可以依次执行三个消费者类的main方法,之后再执行生产者类的main方法。

消费者1终端:

dcafd15a84e6f852aaaacb14dcd05e205f0.jpg

消费者2终端:

3a31268686739225973d6d3d262f3c3cbad.jpg

消费者3终端:

2700088c1116e061d999fe1f4ab90d2d661.jpg

 

 

转载于:https://my.oschina.net/u/3229807/blog/1860321

你可能感兴趣的文章
mongodb分享(二)
查看>>
template模板的使用方法
查看>>
想成为一个高效的Web开发者吗?来看看大牛分享的经验吧~ #精选JAVASCRIPT前端开发...
查看>>
删除某个文件夹下的所有文件
查看>>
多线程 -- 实现秒抓
查看>>
1032 Sharing
查看>>
Leetcode | Minimum/Maximum Depth of Binary Tree
查看>>
引水工程
查看>>
centos 7 安装tomcat
查看>>
UIViewController的生命周期及iOS程序执行顺序
查看>>
LINUX C: 获取本地指定网卡的IP地址
查看>>
Centos 常用系统命令
查看>>
本地yum源建立
查看>>
12. MySQL简单使用
查看>>
7. Python运算符之逻辑、成员、身份运算符及优先级
查看>>
继续畅通工程
查看>>
Jconsole的使用
查看>>
mysql返回记录的ROWNUM(转)
查看>>
2019-2-18
查看>>
mysql常用
查看>>