smniuhe

春风十里,不如你...


  • Home

  • About

  • Tags

  • Categories

  • Archives

  • Schedule

  • Search

RocketMQ 消息零丢失分析

Posted on 2020-06-06 | Edited on 2020-06-07 | In MQ

消息丢失

消息中间件使用过程中,消息丢失是一件不可避免和难以处理和分析的事情,需要打印相关日志,并且深入研究消息中间件底层去分析数据丢失的潜在环节,才能结合业务去调整处理,生产者、消费者、消息中间件都会存在消息丢失,当然除了网络因素外,绝大部分都是开发者自身没有很好的处理和思考该问题,所以容易在排查问题的时候直接抛出服务正常

劳务实名制系统:服务正常的

中间件维护人员:服务正常的

人脸库管理系统:服务正常的

消息生产环节

RokcetMQ 生产者发送的模式有三种,异步发送、同步发送、单向发送;基于大家使用消息中间件的高性能和高吞吐,默认都是异步发送模式

异步发送模式

异步发送数据后,只要没有抛出异常,该消息默认是发送成功的,如果抛出异常,直接回滚本地事务,这种做法处理普通业务也可行的,但是当存在调用第三方组件的时候,操作缓存 Redis、搜索引擎 Elasticsearch 的时候不是很适用

同步发送模式+反复多次重试

这种方式用的也比较多,主要的原因是比较简单和高效,直接同步发送,等待消息的响应信息,确保消息发送成功,再去处理第三方组件问题,但是也存在一个问题就是,如果此时第三方组件不可用或者其他业务子系统不可用,单方面的发送消息也会导致数据的不一致,你想想看,如果此时消费者读取到了这条消息,然而你其他业务系统和组件不可用

事务消息机制

事务消息机制可以保证在第三方组件和业务子系统不可用的同时,告诉消息中间件服务,我发的是事务消息,先别急着给消费者看到,我要确保我的第三方组件、业务子系统都操作成功,也可以理解为本地事务操作成功,你再给我 commit,否则 rollback 这条发送的消息,同时解决 同步发送模式+反复多次重试 导致消息发送性能降低,不过部分业务是可以退化到该方案,毕竟不是所有的业务都需要这么高的性能,大部分情况下也不会出现网络抖动、延迟,导致重复的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@RocketMQTransactionListener(txProducerGroup = "tx-add-employee-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

private final IRocketmqTransactionLogService rocketmqTransactionLogService;


// half 消息发送成功了
// 就会在这里回调这个函数,就可以处理处理本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

try {
// 处理本地事务
// 如果本地事务执行成功,返回 COMMIT
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {

// 本地事务处理失败,回滚一切执行过的操作
// 如果本地事务执行失败,返回 ROLLBACK ,标记 half 消息无效
return RocketMQLocalTransactionState.COMMIT;
}
}

// 如果因为各种原因没有返回 commit 或者 rollback
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

// 查看本地事务是否执行成功
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogService.getOne(
Wrappers.<RocketmqTransactionLog>lambdaQuery()
.eq(RocketmqTransactionLog::getTransactionId, transactionId),
false
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

消息中间件环节

消息中间件的作用就是接受生产者的消息,给消费者提供消息;此时需要在此基础上增加高性能的写入和读取,以及高可用、高吞吐;所以这个环节是最不能出现消息丢失的,毕竟公司的很多业务系统都是使用同一套 MQ 服务组件,对应的生产者和消费者都是多对一的关系

刚刚说到保持高性能的读写基本是离不开内存和文件IO读写,mmap 内存技术的使用和 Netty 被运用其中;CommitLog 文件一般是 1 G 的大小,是因为 mmap 操作内存的大小也就是 1 ~ 1.5 G;此时 CommitLog 在 OS Cache 和磁盘文件做了一个内存映射,所有的消息都是直接写入到 OS cache 中,如果此时 Broker 宕机或者 正好在 Leader 的重新选举,都是会丢失消息数据的,OS Cache + 异步刷盘保证到 Broker 的高性能、高吞吐处理生产者写入的消息

Broker 默认是使用异步刷盘的,可以修改 Broker Config 文件来进行更改,当然如果所有的写入消息都需要直接写入磁盘文件,那么 Borker 的处理性能会降低几十倍、上百倍,内存的性能和磁盘IO的性能是有非常大的差距,不够还有一种方案是,采用数据冗余的形式,Master Broker 集群模式派上用场,此时某台机器宕机导致消息丢失,冗余的那台 Master Broker 可以立马顶上,冗余备份 + 异步刷盘 可以同时兼备高性能的写入和消息不丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@iZuf6iq8e7ya9v3ix71k0pZ conf]# vim broker.conf
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

消息消费环节

消息消费环节的丢失,消费者正好读取一批消息到内存中,宕机了,此时消息就丢失了,可以这么理解么,这种情况的消息丢失反而影响不大,因为消息是否被消费是由消息中间件的 Broker 来标记的,此时你并没有提交消息的 offset 到 Broker 去,机器重启后,下一次消息的拉取还是这批消息,因为这批消息并消费,消费者只要保证业务处理完的同时,手动的提交 offset 到 Broker 中,这批消息业务的处理此时需要同步执行,防止子线程异步处理的形式,导致消息并未处理而提前提交 offset 问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx-add-employee-group");
// 设置NameServer的地址
consumer.setNamesrvAddr("http://localhost:9876");
// 订阅Topic,你要消费哪些Topic的消息
consumer.subscribe("EmployeeAddSuccessTopic", "*");

// 这里注册一个回调接口,去接收获取到的信息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

new Thread() {
@Override
public void run() {
// 子线程处理消息,错误示范
}
}.start();

// 处理这批任务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

}

全链路消息零丢失方案

线上系统压力分析,系统负载倍增时,频繁GC

Posted on 2020-05-31 | Edited on 2020-06-06 | In JVM

线上系统部署架构

4 核 8 GB 的机器,分配了 4GB 给 JVM,3G的内存给堆,新生代和老年代各 1.5 GB,元数据区是 256 MB,栈的内存空间是 1M,人脸库管理系统有几个定时器任务,设备人脸同步任务,设备时间同步任务,还有其他框架的后台线程,和jvm的后台线程,大概几十个线程;
jstat -gc 命令观察到,Eden 内存每秒 2M 速度在新增,600s 左右 Eden区才满,5分钟左右一次YoungGC,GC 系统停顿 350ms,每次 YoungGC 完大概 200~300kb 的存活对象存入 S 区,等到下次 GC,之前存活的 200~300kb 可以同步被回收,从而达到 Old GC 保持的次数一直在零次,此时的系统JVM垃圾回收器使用 NewPar+CMS 是没问题的,因为这是一个后台计算同步的系统,不需要直接面向用户,所以 5 分钟一次 GC,每次停顿 350 ms,对用户的影响也是无感知的

Read more »

RocketMQ ...xxx.RemotingTooMuchRequestException: sendDefaultImpl call timeout

Posted on 2020-05-11 | In MQ

单机环境搭建

依赖

  1. 64 位操作系统,Linux/ Unix/Mac OS

  2. 64位JDK 1.8+

  3. Maven 3.2.x

  4. Git

代码下载

1
git clone https://github.com/apache/rocketmq.git
Read more »

服务器资源有限下的JVM调优

Posted on 2020-02-20 | Edited on 2020-05-11 | In JVM

疫情下的电话

有一天阿洋打我电话,问我系统执行的效率比较慢,而且定时任务总是会出现延迟,短暂时间的停顿阻塞问题,问我能不能优化

系统优化

系统优化就是尽量的增加系统的性能和吞吐量,要想做到这两点必须需要系统优化,应该频繁的YoungGC、OldGC、FullGC会影响系统的性能和吞吐量,系统优化就是JVM优化,JVM优化就是分配合理的内存、参数设置,从而减少GC给系统带来的影响,想要解决这个问题,就必须了解JVM内存结构和GC的特点,不同的厂商JVM的实现不同,不同版本也存在差异;第二点就是没有绝对内存分配、参数设置的推荐值,必须结合系统的实际的运行模型,分析出系统比较占资源的核心业务,深入了解业务以及业务后续发展的规模做出评估,评估出QPS,以及每秒的内存占用情况,结合JVM的内存结构、GC特性来动态分配内存、参数设置从而尽可能的减少GC的次数,减少STW的时间;第三点就是就是很多时候机器资源是有限的,小公司更加看中成本,需要开发者自己去衡量,尽可能增加系统的性能,节约成本

Read more »

优先队列和堆

Posted on 2019-09-10 | In Data Structure

优先队列

wikipedia

优先队列是计算机科学中的一类抽象数据类型。优先队列中的每个元素都有各自的优先级,优先级最高的元素最先得到服务;优先级相同的元素按照其在优先队列中的顺序得到服务。优先队列往往用堆来实现。

普通队列:先进先出;后进后出

优先队列:出队顺序和入队顺序无关;和优先级相关

Read more »

工厂设计模式

Posted on 2019-08-15 | In design model

聊聊工厂模式相关的设计

《设计模式》《代码大全》都要提及抽象工厂模式、工厂方法模式,都需要创建型模式,属于常见的 23 中设计模式中,工作中设计到工厂相关的模式还有另外两种,简单工厂,静态工厂方法,他们之间的区别是什么呢?

Read more »

长字符串如何建立索引

Posted on 2019-07-23 | Edited on 2019-08-15 | In RDB

场景分析

库中有一张表记录着所有客户文件上传或者、客户上传的BASE64图片资源,file_store_record 表中有两个字段 name original_name 分别取记录文件名称和文件的原文件名称,由于文件名称是唯一的,有些做法会在该字段下建立唯一索引,并且都是通过时间戳(秒||毫秒)+ 文件后缀的形式去生成唯一值

1
2
3
name				original.name
15634508992959536.jpg 1寸白底.jpg
15634510398049541.jpg 1寸白底.jpg

索引优化

首先我们分析下上面的情况,能如何去优化,或者说有哪些问题,首先name 的字段类型是 VARCHAR,VARCHAR 存储可变长度的 M 个字符,大小 0-65535 字节,如果是 UTF-8 编码的话,一个字符占 3 字节,如果是 UTF-8mb4,一个字符占 4 字节,UTF8mb4 varchar(10)=40字节;从上面的例子可以看出已经超过了 20 字符,name 字段 varchar(20),索引的存储并不会存储你改列的实际大小,只会存储改字段定义类型所占的字节大小;第二是索引是存储在内存中的,索引的查找也是会设计到比较的,虽然是存储在内存中,一般还是建议索引的类型和长度越短越好,第三的思路是进行比较部分长度,其实在前12,13,14 位的区分度就比较高,可以建立 前缀索引

Read more »

说服艺术

Posted on 2019-06-23 | Edited on 2019-08-15 | In life

每当你遇到难解的问题,不要悲伤,不要着急,总有一个人会给你答案,每次看连叔的解答都能让人产生信心,也能让你不断的去思考。

Read more »

框架 or 基础知识

Posted on 2019-03-31 | Edited on 2019-08-15

前言

鄙人南下就业,以为掌握 Dubbo、SpringCloud 等技术栈就比较占优,不曾想到各种碰壁,事与愿违,在准备面试题的过程中,开始思考,痛并思痛,自己是不是该改变学习策略了;框架和基础知识那个更重要呢?

第一家面试的是亚马逊中国,上来就是一顿算法题,懵了;

第二家面试的是懒人科技,上来也是一顿 Java 基础知识,GC 优化,多线程、并发编程,底层原理,哎… 又被打败了;

刚好看到 极客时间 有相关的文章,甚是欣喜,特意引用,以史为诫。

Read more »

Tomcat NIO、APR 压力测试性能对比

Posted on 2019-02-02 | In tomcat

Tomcat 三种线程模式

一、bio(blocking I/O)

即阻塞式I/O操作,表示Tomcat使用的是传统的Java I/O操作(即java.io包及其子包)。是基于JAVA的HTTP/1.1连接器,Tomcat7以下版本在默认情况下是以bio模式运行的。一般而言,bio模式是三种运行模式中性能最低的一种。我们可以通过Tomcat Manager来查看服务器的当前状态。(Tomcat7或以下,在 Linux 系统中默认使用这种方式)

一个线程处理一个请求,缺点:并发量高时,线程数较多,浪费资源

二、nio(new I/O)

是Java SE 1.4及后续版本提供的一种新的I/O操作方式(即java.nio包及其子包)。Java nio是一个基于缓冲区、并能提供非阻塞I/O操作的Java API,因此nio也被看成是non-blocking I/O的缩写。它拥有比传统I/O操作(bio)更好的并发运行性能。要让Tomcat以nio模式来运行只需要在Tomcat安装目录/conf/server.xml 中将对应的中protocol的属性值改为 org.apache.coyote.http11.Http11NioProtocol即可

利用 Java 的异步请求 IO 处理,可以通过少量的线程处理大量的请求

注意: Tomcat8 以上版本在 Linux 系统中,默认使用的就是NIO模式,不需要额外修改 ,Tomcat7必须修改Connector配置来启动

undefined

三、apr(Apache Portable Runtime/Apache可移植运行时) ( 安装配置过程相对复杂)

Tomcat将以JNI的形式调用Apache HTTP服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大地提高Tomcat对静态文件的处理性能。Tomcat apr也是在Tomcat上运行高并发应用的首选模式。从操作系统级别来解决异步的IO问题

undefined

zhuawang’s blog三种高级运行模式

Read more »
12…6

smniuhe

大家好,我是牛河
56 posts
37 categories
83 tags
GitHub E-Mail
Links
  • 阿里中间件
  • 美团技术团队
  • Mercy Ma
  • CoolShell
  • 当然我在扯淡
  • 周立
  • 芋道源码
  • 程序猿DD
  • 梁桂钊
  • 纯洁的微笑
© 2020 smniuhe
Powered by Hexo v3.8.0
|
Theme – NexT.Muse v7.0.1