RabbitMQ初步到精通-第四章-RabbitMQ工作模式-PUB/SUB
创始人
2024-01-31 05:43:40
0

第四章-RabbitMQ工作模式-PUB/SUB

1.模式介绍

1.1 模式

此模式称为发布订阅模式,从此模式开始,我们就不再使用默认的交换机了,开始定义我们自己的交换机。

此发布订阅模式,使用的交换机类型为Fanout。定义好交换机,消息的传输路径变为,从Producer发出,发送至Fanout类型的交换机,交换机将消息分别推送给和自己绑定的Queue上,

消费者再去消费对应的Queue,此处与前面模式一致。

 1.2 应用场景

此种模式用在什么场景呢,凡是场景中触发一个动作后,后面的流程都需要根据此动作做出反应,而后面的流程一般>1个,流程之间也没有先后顺序,还需要快速执行,即可以使用此模式。

举个栗子:

1. 用户注册成功后发送通知,既要发短信,又发邮件,还发站内信,发信息这些流程也不存在先后顺序,即可以使用这种模式。用户注册后,推送消息到mq, 发短信、发邮件、发站内信分别去消费消息实现信息发送。

2. 支付系统中用户支付成功后,后续要有一系列动作,例如要上B端账,调风控、结算、发消息等逻辑,也同样可以使用此模式应用其中。

1.2 模拟

首先注意到我们的Exchange 类型 是 fanout,

其次在发送的时候 routingkey -填与不填都不再影响 Exchange的分发路由了。

 

2.验证代码

还是举例 小明洗澡的例子。小明一天在看科幻小说,突然睡着了,梦里梦见自己研究出了一种热水器转换开关,这种开关可以将流过的水复制一份。心想太棒了,这以后能省不少水。

于是改造了下自己的太阳能热水器,热水器内部变成了两个水槽,加入此转换器后,分别给两个水槽上水,水管出水1L,两个水槽都能充满1L。改造成功后,便开始拉着自己的女朋友开始了洗澡实验。

2.1 生产者


/*** @author rabbit* @version 1.0.0* @Description 发布订阅模式一个生产者,一个交换机,两个队列,两个消费者* 声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定规则直接绑定。* 生产者创建一个exchange并且指定类型,和一个或多个队列绑定在一起。当生产者发送消息是会发送到exchange中,再由exchange到绑定的队列中* @createTime 2022/07/27 19:34:00*/
public class WaterProducer {public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";//生产者public static void main(String[] args) throws Exception {//1、获取connectionConnection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {sendMsg(channel, i);Thread.sleep(100);}//4、关闭管道和连接channel.close();connection.close();}private static void sendMsg(Channel channel, int k) throws IOException {//3、通过channel创建自己的exchange 并且绑定队列/*** 参数1:exchange的名称* 参数2:指定exchange的类型* FANOUT-Publish/Subscribe* DIRECT-Routing* TOPIC-Topics*/channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");channel.queueBind(PUBSUB_QUEUE_2, "publish-exchange", "");//4、发送消息到exchangeString msg = k + "升";/*** 参数1:指定exchange,使用“”。默认的exchange* 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中* 参数3:指定传递的消息携带的properties* 参数4:指定传递的消息,byte[]类型*/channel.basicPublish("publish-exchange", "", null, msg.getBytes());System.out.println("水龙头放水成功!" + k + "升");}}

2.2 消费者

小明:


/*** @author rabbit* @version 1.0.0* @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者* @createTime 2022/07/27 19:36:00*/
public class XMShowerConsumer {public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(PUBSUB_QUEUE_1, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(),false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调**///3.关闭自动ACKchannel.basicConsume(PUBSUB_QUEUE_1,false,consumer);System.out.println("小明使用热水器中的XM水槽开始洗澡......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

小丽:


/*** @author rabbit* @version 1.0.0* @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者* @createTime 2022/07/27 19:36:00*/
public class XLShowerConsumer {public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(PUBSUB_QUEUE_2, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("小丽洗澡用水: " + new String(body, "UTF-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调**///3.关闭自动ACKchannel.basicConsume(PUBSUB_QUEUE_2, false, consumer);System.out.println("小丽使用热水器中的XL水槽开始洗澡......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

2.3 结果

生产者:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升

消费者小明:

小明洗澡用水: 1升
小明洗澡用水: 2升
小明洗澡用水: 3升
小明洗澡用水: 4升
小明洗澡用水: 5升
小明洗澡用水: 6升
小明洗澡用水: 7升
小明洗澡用水: 8升
小明洗澡用水: 9升
小明洗澡用水: 10升

消费者小丽:

小丽洗澡用水: 1升
小丽洗澡用水: 2升
小丽洗澡用水: 3升
小丽洗澡用水: 4升
小丽洗澡用水: 5升
小丽洗澡用水: 6升
小丽洗澡用水: 7升
小丽洗澡用水: 8升
小丽洗澡用水: 9升
小丽洗澡用水: 10升

最终实现了,生产者出水10L,两个人都使用了10L水洗了澡,刚洗完,小明就睡醒了,原来是一个梦。

3. 总结

从此模式开始,我们接触到了Exchange的创建,及绑定,以及使用了Exchange的类型 Fanout。

而以前我们在simple模式及work模式中,用到的默认Exchange类型都是Direct类型。

核心代码-声明Exchange:

channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);

核心代码-绑定queue

channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");

核心代码-发送给Fanout的Exchange

channel.basicPublish("publish-exchange", "", null, msg.getBytes());

相关内容

热门资讯

2025“三农”成绩单|筑牢粮...   粮食安全,是“国之大者”。  10月23日,在北大荒集团黑龙江格球山农场有限公司一烘干厂,工人驾...
视频丨赏冰雕品美食 穿唐装游古...   随着元旦临近,不少地方已经提前开启了旅游市场的预热活动。在青海省西宁市城北区的北川青唐城,大型雾...
一线见闻丨一条通道串联三大都市...   “十五五”规划建议提出,要增强区域发展协调性,巩固提升粤港澳大湾区高质量发展动力源作用。广东省中...
爱奇艺回应充25年会员退费难:...   12月28日,话题“男子爱奇艺会员充了25年遇退费难”登上微博热搜。  12月25日,河南许昌,...
30日至2026年1月2日强冷...   央视网消息:据中央气象台网站消息,预计12月30日至2026年1月2日,强冷空气将影响中东部大部...
财政部:2026年继续安排资金...   财政部部长蓝佛安在12月27日至28日举行的全国财政工作会议上表示,明年财政将大力提振消费。深入...
(年终特稿)中国经济这一年:民...   中新社北京12月28日电 (记者 刘亮)2025年,中国民营经济迎风破浪前行,不断在新赛道上跑出...
欧洲最高火山时隔半年再次喷发   位于意大利西西里岛东岸的埃特纳火山于当地时间12月26日开始喷发。  12月27日的画面显示,火...
香港“兴”观察|创新赋能 提质...   茶餐厅老板投身“谷子经济”大受欢迎,钢铁公司用大数据传承焊接技艺,循环经济让零农药、零化肥蔬菜走...
铁路公安加强巡逻检查为多条新建...   近期,多条新建高铁陆续开通运营。各地铁路公安机关强化路地公安警务融合,加强与铁路企业协作,强化巡...