实战:流处理丢数了怎么查?
发布日期:2021-06-29 20:34:30 浏览次数:3 分类:技术文章

本文共 5899 字,大约阅读时间需要 19 分钟。

公众号推文规则变了,点击上方 "数据社"关注, 设为星

后台回复【加群】,申请加入数据学习交流群

 

一。问题现象

笔者所在公司基于kakka-flink-es开发了一套日志中心,以方便业务部分通过日志排查业务问题(日志包括全链路日志,业务日志等)。近期业务部门反馈,日志中心在使用过程中出现了数据丢失的情况,由此引发了大家的的疑惑,Flink不是可以确保END-TO-END 的 EXACTLY-ONCE 语义吗?怎么会丢数据?一口大锅扔给了Flink。

二。数据链路背景

各个微服务使用了 log4j2 的 kafkaAppender,并用AsyncAppender 引用了KafkaAppender, 然后以 Async 方式异步地把日志写入到 Kafka topic中,然后 flink 流处理程序实时消费 kafka 中的日志数据并写入到下游的 elasticsearch中,供业务查询日志使用。在使用过程中发现,es中查询得到日志,相对于微服务的原始日志,有数据丢失的现象。之所以得出改结论,是因为微服务通过 log4j 的 rollingFileAppender 也会把日志落在本地一份,通过对比本地日志和 kafka 中的日志,发现有时数据有丢失。各组件目前使用的版本如下:log4j2.9.1, kafka 2.11-2.2.0,flink 1.11,es 6.3.1。

 

三。问题原因分析

由于业务反馈的日志丢失,是通过对比微服务的原始日志与落在es中的最终日志得出的结论,从数据链路上来讲,问题可能出在微服务通过log4j向kafka写日志,kafka自身存储日志,flink消费kafka日志并写入es,以及es自身存储日志,这些数据链路上的各个环节。则我们也因该顺着数据链路,端到端低分析,哪个环节可能出了问题。

首先 Kafka 和 es 自身都可以通过多副本机制提供持久化的存储与容错,在没有出现硬件多节点故障的情况下,是不会出现丢失数据的问题的。在我们的场景中,kafka topic 创建时都指定了多副本,(当然也可以通过集群级别参数default.replication.factor配置默认值),es也是配置了

replica shard,所以存储层面不存在问题。

然后来看下 flink 消费 kafka 数据的逻辑。

Flink提供了一个kafka consumer的实现,即org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer, 该类整合了flink的 checkpoint机制, 通过  checkpoint 机制,flink 提供了作业失败时的恢复能力,应用可以通过配置 checkpoint_interval 来定期触发 Checkpoints 的执行。也就是说,flink通过周期性地执行checkpoint,自己维护了消费kafka数据时的状态即位移信息,以处理作业失败时的恢复。如下英文所述: "The Kafka consumer in Apache Flink integrates with Flink’s checkpointing mechanism as a stateful operator whose state are the read offsets in all Kafka partitions. When a checkpoint is triggered, the offsets for each partition are stored in the checkpoint. Flink’s checkpoint mechanism ensures that the stored states of all operator tasks are consistent, i.e., they are based on the same input data. A checkpoint is completed when all operator tasks successfully stored their state. Hence, the system provides exactly-once state update guarantees when restarting from potential system failures".  "A checkpoint is a consistent copy of the state of a Flink application and includes the reading positions of the input. In case of a failure, Flink recovers an application by loading the application state from the checkpoint and continuing from the restored reading positions as if nothing happened."

所以 Flink 在读取kafka数据并写入es的过程中,无论是正常执行,还是遇到异常后的恢复,都能确保不丢失数据,都能确保eaxctly-once。

我们在 flink 程序日志中偶然可见如下信息: ”commiting offsets to kafka failed. this does not compromise flink'scheckpoints.”, ”offset commitfailed on partition xx at offset xxx: the coordinator is not aware of thismember.”, “commit cannot becompleted since the group has already rebanlanced and assigned the partitionsto another member. this means that the time between subsequent calls to poll()was longer than the configured max.poll.interval.ms, which typically impliesthat the poll loop is spending too much time on message processing. you canaddress this either by increasing max.poll.interval.ms or by reducing themaximum size of bathes returned in poll() with max.poll.records.”,  但这些信息仅代表flink向kafkacommitoffset时有时会失败,并不会引起flink读取kafak并写入es中的数据的丢失。因为在配置enable.auto.commit为false, 配置commit_offsets_on_checkpoints为true的情景下,flink自己维护了自己消费kafka数据的进度状态(即topic-partition的offset),向kafka反馈的offset只是为外部监控工具等提供了监控的便利,flink自身并不使用他们。(当然如果想优化这点,可以调整flink使用的kafka consumer相关参数,如max.poll.records,max.poll.interval.ms,session.timeout.ms等)

相关源码:

四。问题的真正原因

既然以上数据处理和存储环节都没有问题,那么问题很有可能出在数据生成的环节,即微服务通过log4j向kafka写入数据的环节。查看微服务的log4j的配置,发现log4j2.AsyncQueueFullPolicy使用的是Discard,log4j2.discardThreshold使用的是ERROR。该参数的意义是说,在Async appender 的underlying appender ,也就是我们这里的kafkaAppender,向下游写日志的速度跟不上上游微服务日志生成的速度时,如果造成的挤压超过了缓冲区的大小,新产生的 ERROR级别及比 ERROR 更严重级别的日志,就会被抛弃掉不再记录!所以以上整个数据链路上丢数据的问题,应该就是此处log4j2.AsyncQueueFullPolicy配置为Discard引起的!

至此,Flink成功地把锅甩出去了!Flink可以大声说,丢数据这个锅,我们不背!

五。问题解决

咨询业务人员,发现这里是特意将log4j2.AsyncQueueFullPolicy 配置为Discard的,因为微服务大都是关键的线上应用,不能因为要写日志而对业务处理造成比较大的影响。也就是说该配置是由业务特点的需求决定的,不能做改动。所以我们的问题,也就收敛为:如何提高log4j使用 kafkaAppender向kafka写入日志时的速度,尽量少地丢失数据甚至不丢失数据?

该问题的解决,可以从以下几个方面进行优化调整。

一. kafka使用方式的调整(并行度的调整)

kafka整体的并发度是由topic-partition的个数决定的,(其底层机制是:每个topic-partition在磁盘上对应若干个log segment(即若干个文件),但由于topic-partition只能顺序写入顺序读取,所以在同一时间只能向一个log segment即一个文件顺序写入数据,所以我们说kafka整体的读写并发度是由topic-partition的个数决定的)。当前我们的日志中心只使用了一个topic且该topic有3个partition,所以此时所有的微服务在同一时间都是写入到这3个分区对应的3个文件的,相互之间的竞争可想而知。建议扩展topic个数,比如每个微服务对应一个topic,或根据微服务各自的特点将微服务划分为几类,每个类别对应一个kafka topic。

二。kafka broker本身的优化调整

关键参数log.dirs可以配置为不同磁盘分区上的多个目录而不是当前的单个目录,以充分利用多个磁盘的io能力,同时这些磁盘分区最好是kafka独用的不跟其它应用组件共享。这是因为各个topic-partition的log segment文件会以round robin的形式分散到这里的目录下,如果配置为多个磁盘分区上的多个目录,就能利用多个磁盘的Io能力。目前业务现场使用了raid 10磁盘阵列,而不是JBOD 裸盘,这是没有必要的,会造成一定的浪费,因为kafka自身已经做了数据多副本的冗余。其它参数有些也可以调整,如num.io.threads最好配置为磁盘的个数,num.network.threads也可以调整比如为3。

三。log4j写入kafka时的优化调整

可以调大使用的缓冲区的大小log4j2.asyncLoggerRingBufferSize 和log4j2.asyncLoggerConfigRingBufferSize,代表的是缓冲环中slot的个数,默认值是 256*1024,需要注意调大该值时,使用的堆内存也会增大;同时底层kafkaAppender使用的kafka producer相关的参数需要优化,比如:batch.size可以从当前的默认值16384进一步调大, retries 可以从当前的1适当调大(由于上层已经使用了AsyncAppender, 此时KafkaAppender的retry不会阻塞业务), max.block.ms可以从当前的1000即1秒适当调大(默认值是60000即1分钟,由于上层已经使用了AsyncAppender, 此时KafkaAppender的retry不会阻塞业务);compression.type可以配置为lz4,目前是默认值none, 即没有压缩;linger.ms可以适当调大到10到100之间,当前是默认值0。

 

以上就是Flink甩锅的一次记录,这中间涉及到了较多的知识点,包括:

Flink的checkpoint机制;Kafka的架构原理等相关知识;Log4j的 async appender和kafka appender相关知识。 

照例给出参考链接,供大家进一步学习研究。

 

参考链接

https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets

https://kafka.apache.org/documentation/#brokerconfigs

https://logging.apache.org/log4j/2.x/manual/async.html

https://docs.cloudera.com/documentation/enterprise/latest/topics/kafka_performance.html

https://docs.confluent.io/platform/current/kafka/deployment.html

https://www.confluent.io/white-paper/optimizing-your-apache-kafka-deployment/

欢迎加入
大数据
|数仓技术交流群

进群方式:请加微信(微信号:dataclub_bigdata),回复:加群,通过审核会拉你进群。

(备注:行业-职位-城市)

福利时刻

01. 后台回复「数据」,即可领取大数据经典资料。

02. 后台回复「转型」,即可传统数据仓库转型大数据必学资料。

03. 后台回复「加群」,或添加一哥微信IDdataclub_bigdata  拉您入群(大数据|数仓|分析)或领取资料。

关注不迷路~ 各种福利、资源定期分享

你点的每个在看,我都认真当成了喜欢

转载地址:https://dataclub.blog.csdn.net/article/details/114317449 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:流批一体神器 Flink 已成气候!!! Spark 这下彻底没戏了?
下一篇:两万字详解HDFS 分布式文件系统(建议收藏)

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年05月02日 06时33分56秒