RocketMQ 消息零丢失分析

消息丢失

消息中间件使用过程中,消息丢失是一件不可避免和难以处理和分析的事情,需要打印相关日志,并且深入研究消息中间件底层去分析数据丢失的潜在环节,才能结合业务去调整处理,生产者、消费者、消息中间件都会存在消息丢失,当然除了网络因素外,绝大部分都是开发者自身没有很好的处理和思考该问题,所以容易在排查问题的时候直接抛出服务正常

劳务实名制系统:服务正常的

中间件维护人员:服务正常的

人脸库管理系统:服务正常的

消息生产环节

RokcetMQ 生产者发送的模式有三种,异步发送、同步发送、单向发送;基于大家使用消息中间件的高性能和高吞吐,默认都是异步发送模式

异步发送模式

异步发送数据后,只要没有抛出异常,该消息默认是发送成功的,如果抛出异常,直接回滚本地事务,这种做法处理普通业务也可行的,但是当存在调用第三方组件的时候,操作缓存 Redis、搜索引擎 Elasticsearch 的时候不是很适用

同步发送模式+反复多次重试

这种方式用的也比较多,主要的原因是比较简单和高效,直接同步发送,等待消息的响应信息,确保消息发送成功,再去处理第三方组件问题,但是也存在一个问题就是,如果此时第三方组件不可用或者其他业务子系统不可用,单方面的发送消息也会导致数据的不一致,你想想看,如果此时消费者读取到了这条消息,然而你其他业务系统和组件不可用

事务消息机制

事务消息机制可以保证在第三方组件和业务子系统不可用的同时,告诉消息中间件服务,我发的是事务消息,先别急着给消费者看到,我要确保我的第三方组件、业务子系统都操作成功,也可以理解为本地事务操作成功,你再给我 commit,否则 rollback 这条发送的消息,同时解决 同步发送模式+反复多次重试 导致消息发送性能降低,不过部分业务是可以退化到该方案,毕竟不是所有的业务都需要这么高的性能,大部分情况下也不会出现网络抖动、延迟,导致重复的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@RocketMQTransactionListener(txProducerGroup = "tx-add-employee-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

private final IRocketmqTransactionLogService rocketmqTransactionLogService;


// half 消息发送成功了
// 就会在这里回调这个函数,就可以处理处理本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

try {
// 处理本地事务
// 如果本地事务执行成功,返回 COMMIT
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {

// 本地事务处理失败,回滚一切执行过的操作
// 如果本地事务执行失败,返回 ROLLBACK ,标记 half 消息无效
return RocketMQLocalTransactionState.COMMIT;
}
}

// 如果因为各种原因没有返回 commit 或者 rollback
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

// 查看本地事务是否执行成功
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogService.getOne(
Wrappers.<RocketmqTransactionLog>lambdaQuery()
.eq(RocketmqTransactionLog::getTransactionId, transactionId),
false
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

消息中间件环节

消息中间件的作用就是接受生产者的消息,给消费者提供消息;此时需要在此基础上增加高性能的写入和读取,以及高可用、高吞吐;所以这个环节是最不能出现消息丢失的,毕竟公司的很多业务系统都是使用同一套 MQ 服务组件,对应的生产者和消费者都是多对一的关系

刚刚说到保持高性能的读写基本是离不开内存和文件IO读写,mmap 内存技术的使用和 Netty 被运用其中;CommitLog 文件一般是 1 G 的大小,是因为 mmap 操作内存的大小也就是 1 ~ 1.5 G;此时 CommitLog 在 OS Cache 和磁盘文件做了一个内存映射,所有的消息都是直接写入到 OS cache 中,如果此时 Broker 宕机或者 正好在 Leader 的重新选举,都是会丢失消息数据的,OS Cache + 异步刷盘保证到 Broker 的高性能、高吞吐处理生产者写入的消息

Broker 默认是使用异步刷盘的,可以修改 Broker Config 文件来进行更改,当然如果所有的写入消息都需要直接写入磁盘文件,那么 Borker 的处理性能会降低几十倍、上百倍,内存的性能和磁盘IO的性能是有非常大的差距,不够还有一种方案是,采用数据冗余的形式,Master Broker 集群模式派上用场,此时某台机器宕机导致消息丢失,冗余的那台 Master Broker 可以立马顶上,冗余备份 + 异步刷盘 可以同时兼备高性能的写入和消息不丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@iZuf6iq8e7ya9v3ix71k0pZ conf]# vim broker.conf
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

消息消费环节

消息消费环节的丢失,消费者正好读取一批消息到内存中,宕机了,此时消息就丢失了,可以这么理解么,这种情况的消息丢失反而影响不大,因为消息是否被消费是由消息中间件的 Broker 来标记的,此时你并没有提交消息的 offset 到 Broker 去,机器重启后,下一次消息的拉取还是这批消息,因为这批消息并消费,消费者只要保证业务处理完的同时,手动的提交 offset 到 Broker 中,这批消息业务的处理此时需要同步执行,防止子线程异步处理的形式,导致消息并未处理而提前提交 offset 问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx-add-employee-group");
// 设置NameServer的地址
consumer.setNamesrvAddr("http://localhost:9876");
// 订阅Topic,你要消费哪些Topic的消息
consumer.subscribe("EmployeeAddSuccessTopic", "*");

// 这里注册一个回调接口,去接收获取到的信息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

new Thread() {
@Override
public void run() {
// 子线程处理消息,错误示范
}
}.start();

// 处理这批任务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

}

全链路消息零丢失方案