RabbitMQ 是基于 Erlang 语言开发的开源消息通信中间件
RabbitMQ中的几个概念:
(1)channel:操作MQ的工具
(2)exchange:路由消息到队列中
(3)queue:缓存消息
(4)virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
MQ的官方文档中给出了5个 MQ 的 Demo 示例,对应了几种不同的用法:
(1)基本消息队列(BasicQueue)
(2) 工作消息队列(WorkQueue)
![]()
(3)发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
1️⃣Fanout Exchange:广播
2️⃣Direct Exchange:路由
![]()
3️⃣Topic Exchange:主题
![]()
官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:
(1)publisher:消息发布者,将消息发送到队列 queue
(2)queue:消息队列,负责接受并缓存消息
(3)consumer:订阅队列,处理队列中的消息
(1)基本消息队列的消息发送流程:
1️⃣建立 connection
2️⃣创建 channel
3️⃣利用 channel 声明队列
4️⃣利用 channel 向队列发送消息
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();} }
(2)基本消息队列的消息接收流程:
1️⃣建立 connection
2️⃣创建 channel
3️⃣利用 channel 声明队列
4️⃣定义 consumer 的消费行为 handleDelivery()
5️⃣利用 channel 将消费者与队列绑定
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");} }
上一篇:一个普通骗局!市民田某78万没了
下一篇:jsx代码如何变成dom