消息丢失
消息中间件使用过程中,消息丢失是一件不可避免和难以处理和分析的事情,需要打印相关日志,并且深入研究消息中间件底层去分析数据丢失的潜在环节,才能结合业务去调整处理,生产者、消费者、消息中间件都会存在消息丢失,当然除了网络因素外,绝大部分都是开发者自身没有很好的处理和思考该问题,所以容易在排查问题的时候直接抛出服务正常
劳务实名制系统:服务正常的
中间件维护人员:服务正常的
人脸库管理系统:服务正常的
消息生产环节
RokcetMQ 生产者发送的模式有三种,异步发送、同步发送、单向发送;基于大家使用消息中间件的高性能和高吞吐,默认都是异步发送模式
异步发送模式
异步发送数据后,只要没有抛出异常,该消息默认是发送成功的,如果抛出异常,直接回滚本地事务,这种做法处理普通业务也可行的,但是当存在调用第三方组件的时候,操作缓存 Redis、搜索引擎 Elasticsearch 的时候不是很适用
同步发送模式+反复多次重试
这种方式用的也比较多,主要的原因是比较简单和高效,直接同步发送,等待消息的响应信息,确保消息发送成功,再去处理第三方组件问题,但是也存在一个问题就是,如果此时第三方组件不可用或者其他业务子系统不可用,单方面的发送消息也会导致数据的不一致,你想想看,如果此时消费者读取到了这条消息,然而你其他业务系统和组件不可用
事务消息机制
事务消息机制可以保证在第三方组件和业务子系统不可用的同时,告诉消息中间件服务,我发的是事务消息,先别急着给消费者看到,我要确保我的第三方组件、业务子系统都操作成功,也可以理解为本地事务操作成功,你再给我 commit,否则 rollback 这条发送的消息,同时解决 同步发送模式+反复多次重试 导致消息发送性能降低,不过部分业务是可以退化到该方案,毕竟不是所有的业务都需要这么高的性能,大部分情况下也不会出现网络抖动、延迟,导致重复的发送
1 | "tx-add-employee-group") (txProducerGroup = |
消息中间件环节
消息中间件的作用就是接受生产者的消息,给消费者提供消息;此时需要在此基础上增加高性能的写入和读取,以及高可用、高吞吐;所以这个环节是最不能出现消息丢失的,毕竟公司的很多业务系统都是使用同一套 MQ 服务组件,对应的生产者和消费者都是多对一的关系
刚刚说到保持高性能的读写基本是离不开内存和文件IO读写,mmap 内存技术的使用和 Netty 被运用其中;CommitLog 文件一般是 1 G 的大小,是因为 mmap 操作内存的大小也就是 1 ~ 1.5 G;此时 CommitLog 在 OS Cache 和磁盘文件做了一个内存映射,所有的消息都是直接写入到 OS cache 中,如果此时 Broker 宕机或者 正好在 Leader 的重新选举,都是会丢失消息数据的,OS Cache + 异步刷盘保证到 Broker 的高性能、高吞吐处理生产者写入的消息
Broker 默认是使用异步刷盘的,可以修改 Broker Config 文件来进行更改,当然如果所有的写入消息都需要直接写入磁盘文件,那么 Borker 的处理性能会降低几十倍、上百倍,内存的性能和磁盘IO的性能是有非常大的差距,不够还有一种方案是,采用数据冗余的形式,Master Broker 集群模式派上用场,此时某台机器宕机导致消息丢失,冗余的那台 Master Broker 可以立马顶上,冗余备份 + 异步刷盘 可以同时兼备高性能的写入和消息不丢失
1 | [root@iZuf6iq8e7ya9v3ix71k0pZ conf]# vim broker.conf |
消息消费环节
消息消费环节的丢失,消费者正好读取一批消息到内存中,宕机了,此时消息就丢失了,可以这么理解么,这种情况的消息丢失反而影响不大,因为消息是否被消费是由消息中间件的 Broker 来标记的,此时你并没有提交消息的 offset 到 Broker 去,机器重启后,下一次消息的拉取还是这批消息,因为这批消息并消费,消费者只要保证业务处理完的同时,手动的提交 offset 到 Broker 中,这批消息业务的处理此时需要同步执行,防止子线程异步处理的形式,导致消息并未处理而提前提交 offset 问题
1 | public class Consumer { |