仅此一招,再无消息乱序的烦恼
创始人
2024-01-29 16:28:59
0

1. 概览

RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…

1.1. 背景

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。

在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:

// 计算 destination
protected String createDestination(String topic, String tag) {if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){return topic + ":" + tag;}else {return topic;}
}
// 发送信息
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

但,在消费消息时,就变的没那么方便了,简单示例如下:

@Service
@RocketMQMessageListener(topic = "consumer-test-topic-1",consumerGroup ="user-message-consumer-1",selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY
)
@Slf4j
public class RocketBasedUserMessageConsumer extends UserMessageConsumerimplements RocketMQListener {@Overridepublic void onMessage(MessageExt message) {String tag = message.getTags();byte[] body = message.getBody();log.info("handle msg body {}", new String(body));switch (tag){case "UserCreatedEvent":UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);handle(createdEvent);return;case "UserEnableEvent":UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);handle(enableEvent);return;case "UserDisableEvent":UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);handle(disableEvent);return;case "UserDeletedEvent":UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);handle(deletedEvent);return;}}
}

该方法有几个问题:

  1. tag 维护成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部数据,增加通讯成本;如果使用 tag1 || tag2 方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;
  2. 充斥大量模板代码,比如 case 分支,反序列化,调用业务方法等;
  3. API 具有侵入性,开发是需要关心 RocketMQ API,存在一定学习成本;

1.2. 目标

提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:

  1. Tag 和代码保持一致,不需要多处配置,新增逻辑自动完成 Tag 注册;
  2. 消除模板方法,类中只保留核心业务方法,框架完成 方法分发、消息反序列化等操作;
  3. 代码零侵入,仅使用注解,无需了解 RocketMQ API;

2. 快速入门

框架依赖
rocketmq-spring-boot-starter 完成消息发送和回收。

2.1. 环境准备

2.1.1. 增加依赖

首先,增加 rocketmq 相关依赖。

org.apache.rocketmqrocketmq-spring-boot-starter2.2.1

然后,增加 lego starter。

com.geekhalo.legolego-starter0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT

2.1.2. 增加配置

在 application.yml 文件中增加 rocketmq 配置。

rocketmq:name-server: http://127.0.0.1:9876producer:group: rocket-demo

2.2. 定义消费者

定义消费者,只需:

  1. 在 Bean 上增加 @TagBasedDispatcherMessageConsumer 注解,并指定 topic 和 consumer
  2. 在 Bean 的方法上添加 @HandleTag 注解,并指定监听的 tag

示例如下:

@TagBasedDispatcherMessageConsumer(topic = "consumer-test-topic",consumer = "user-message-consumer"
)
public class UserMessageConsumer {private final Map> events = Maps.newHashMap();public void clean(){this.events.clear();;}public List getUserEvents(Long userId){return this.events.get(userId);}@HandleTag("UserCreatedEvent")public void handle(UserEvents.UserCreatedEvent userCreatedEvent){List userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());userEvents.add(userCreatedEvent);}@HandleTag("UserEnableEvent")public void handle(UserEvents.UserEnableEvent userEnableEvent){List userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());userEvents.add(userEnableEvent);}@HandleTag("UserDisableEvent")public void handle(UserEvents.UserDisableEvent userDisableEvent){List userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());userEvents.add(userDisableEvent);}@HandleTag("UserDeletedEvent")public void handle(UserEvents.UserDeletedEvent userDeletedEvent){List userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());userEvents.add(userDeletedEvent);}
}

2.3. 测试

编写测试用例如下:

@SpringBootTest(classes = DemoApplication.class)
@Slf4j
class UserMessageConsumerTest {@Autowiredprivate UserMessageConsumer userMessageConsumer;@Autowiredprivate RocketMQTemplate rocketMQTemplate;private List userIds;@BeforeEachvoid setUp() throws InterruptedException {this.userMessageConsumer.clean();this.userIds = new ArrayList<>();for (int i = 0; i< 100; i++){userIds.add(10000L + i);}this.userIds.forEach(userId -> sendMessage(userId));TimeUnit.SECONDS.sleep(3);}private void sendMessage(Long userId) {String topic = "consumer-test-topic";{String tag = "UserCreatedEvent";UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();userCreatedEvent.setUserId(userId);userCreatedEvent.setUserName("Name-" + userId);sendOrderlyMessage(topic, tag, userCreatedEvent);}{String tag = "UserEnableEvent";UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();userEnableEvent.setUserId(userId);userEnableEvent.setUserName("Name-" + userId);sendOrderlyMessage(topic, tag, userEnableEvent);}{String tag = "UserDisableEvent";UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();userDisableEvent.setUserId(userId);userDisableEvent.setUserName("Name-" + userId);sendOrderlyMessage(topic, tag, userDisableEvent);}{String tag = "UserDeletedEvent";UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();userDeletedEvent.setUserId(userId);userDeletedEvent.setUserName("Name-" + userId);sendOrderlyMessage(topic, tag, userDeletedEvent);}}private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {String shardingKey = String.valueOf(event.getUserId());String json = JSON.toJSONString(event);Message msg = MessageBuilder.withPayload(json).build();String destination = createDestination(topic, tag);SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);log.info("Send result is {} for msg", sendResult, msg);}protected String createDestination(String topic, String tag) {if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){return topic + ":" + tag;}else {return topic;}}@AfterEachvoid tearDown() {}@Testvoid getUserEvents() {this.userIds.forEach(userId ->{List userEvents = this.userMessageConsumer.getUserEvents(userId);Assertions.assertEquals(4, userEvents.size());Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);});}
}

启动时,可以看到如下日志:

TagBasedDispatcherConsumerContainer : success to subscribe  http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer

从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。

测试逻辑比较简单,逻辑如下:

  1. 创建 100 个用户
  2. 每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent
  3. 消费发送完成后,停顿 3 秒
  4. 依次检测每个用户收到的消息,并对顺序进行检测

观察日志,可以看到发送和消费日志交替出现:

UserMessageConsumerTest        : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms

用例通过,运行结果符合预期。

3. 设计&扩展

3.1. 初始化流程

image

 

框架初始化流程如下:

  1. TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,依次对托管 bean 进行处理;
  2. 如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 注解,便会提取配置信息,构建 TagBasedDispatcherConsumerContainer 实例
  3. TagBasedDispatcherConsumerContainer 收集方法上的 @HandleTag 注解,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等信息构建 DefaultMQPushConsumer 并完成 topic 和 tag 的订阅
  4. TagBasedDispatcherConsumerContainer 内部会构建 tag 与 method 的映射关系,以对指定tag进行处理;

3.2. 运行流程

 

image
运行流程如下:

  1. 消息发送者将消息发送至 MQ;
  2. MQ 将消息发送至 Consumer;
  3. Consumer 收到消息后,根据 tag 对消息进行分发;
  4. 处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;

相关内容

热门资讯

延边农村电商这五年→数字引擎激...   智慧云仓中,精心包装的朝鲜族辣酱与手工煎饼发往全球;直播间里,延边大米、木耳、蜂蜜等农特产品畅销...
活力中国调研行|花样乡村高原“...   盛夏时节,走进平均海拔2800米的青海省西宁市大通回族土族自治县朔北藏族乡边麻沟村,“活力中国调...
传统职业遇见创新思维 你的未来...   齐鲁网·闪电新闻7月24日讯新技术迭代、新产业勃发、新业态涌现,一个个“新”催生着职业图谱的更新...
不怕晒黑就可以不防晒……是真是...   是真是假弄不清?  评论区私我,帮您问专家!  辟谣  硬核打假 用♥辟谣  不听 不信 不传谣...
周冬雨刘昊然被曝分手   7月25日,有媒体曝出周冬雨刘昊然在同一个摄影棚互不打扰,已分手,结束五年恋情。(编辑:杨杨)
宠物店老板和店员浓烟中救出30...   隔壁着火,宠物店内浓烟四起,宠物店老板和店员多次折返,成功救出超30只毛孩子,连小鸟都没放弃“很...
柬埔寨称泰国在边境冲突中使用集...   新华社金边7月25日电 柬埔寨排雷行动和援助受害者管理局25日发表声明说,泰国军队在柬埔寨境内的...
增塑剂超标“问题童鞋”被严查,...   在一些电商平台上,颜色鲜艳、价格低廉的婴幼儿塑料拖鞋很受欢迎,一些店铺号称销量过万。“新华视点”...
好评中国丨让数字丝路更好联通世...   在数字技术蓬勃发展、全球联系日益紧密的当下,2025年世界互联网大会数字丝路发展论坛于7月24日...
今日辟谣(2025年7月25日...   2025年7月25日  辟 谣福建宁德金涵水库爆了?造谣者已被处罚!  详情:近期,某网民在短视...