Apache Spark 在eBay 的优化
发布日期:2021-06-30 11:28:13 浏览次数:2 分类:技术文章

本文共 6375 字,大约阅读时间需要 21 分钟。

供稿 | eBay DSS Team

作者 | 田川晓阳

编辑 | 顾欣怡

本文4490字,预计阅读时间14分钟

导读

新一代数据开发分析平台ZetaeBay DSS(Data Services and Solutions) 团队自主研,主要针对在Spark SQL运行过程中可能存在的性能隐患及Spark执行计划图的缺,提出相应解决方案,旨在降低Spark SQL优化门槛,助力eBay用户解放分析效能,也希望对同业人员有所启发和帮助。

1. 背景介绍

eBay的大部分数据仓库之前一直构建于商业数据仓库系统Teradata之上,从2017年开始,eBay决定完全基于Hadoop平台来构建数据仓库,并开始了恢弘的数据搬迁之旅。然而随着以Spark为主要计算引擎的新开源平台的落户,复杂的数据访问模式和调优分析以及未经结构化的数据展现等问题开始变成全量投产的瓶颈。

为了解决这个困境,eBay DSS(Data Services & Solutions) 部门挺身而出,自主研发了Zeta——新一代数据开发分析平台,旨在为数据工程师、数据分析师和数据科学家提供跨平台且涵盖全生命周期的数据服务。平台支持基于元数据的数据探索、大数据开发、数据测试、数据分析以及终端的数据可视化等功能。目前,平台已累计服务超过2000多名用户,现有1000多名数据工程师、分析师及产品经理高度依赖Zeta平台来完成日常的数据开发处理和分析工作。

2. 痛点分析

随着大量分析场景的涌入和使用的激增,目前Hadoop平台每天要运行上万个Job,处理超过500PB的数据。与此同时,低质量Spark SQL的大量提交导致性能问题屡屡发生,这严重影响了平台性能,导致了资源浪费。然而,让用户自己对Spark SQL进行优化存在很高的技术门槛。对于缺少Spark经验的使用者来说,进行自主排查并解决问题无疑是一项痛苦甚至不可能完成的任务。

为了能够尽早地发现性能隐患、降低SQL优化的门槛、助力eBay用户解放分析效能,Zeta团队基于以往的实践对这些问题进行了一些探索并研发出了独有的解决方案,接下来本文将对此进行详细阐述。

3. 行业分析

3.1 关于性能分析的行业解决方案

1)Doctor Elephant:作为一个开源的Hadoop和Spark性能监控调优工具,主要是通过采集Spark进程运行过程中记录下来的log来进行性能分析并给出建议;但是采集的数据粒度较粗,并且分析过程滞后,无法实时地看到Job的运行情况的分析,而且给出的建议是针对Spark Job运行过程中底层计算框架级别信息的,对于Spark SQL的Job来说不具有针对性,还需要结合Spark内核知识来进行解读。

2)Ganglia等监控工具:这类工具主要通过采集Spark Metrics和log等在运行过程中暴露出来的内部数据来进行可视化展示,以供用户进行性能分析,但是缺点是太过于底层还需要用户自己做大量的统计分析工作,不够直接。

3)Spark UI中提供的SQL Physical Plan执行计划图:最贴近用户实际需求,并且可以实时地看到Job运行的状态,但是存在一些问题,下文会详细描述。

3.2 现有Spark执行计划图的缺陷

Spark UI 中SQL的物理执行计划图是我们观察Job运行状态最直接的一个重要途径,但这个物理执行计划图存在很多缺陷,比如缺失了很多和Spark进程内部情况相关的度量数据以及和JVM相关的重要信息,而这些缺失的信息都是帮助我们洞察性能瓶颈时不可或缺的线索。针对这种情况,Zeta团队进行了一系列的优化改造。

其实Spark执行计划图的主要缺陷在于用户很难直接从图上提供的信息中找出性能瓶颈。在执行Spark SQL的时候,Spark UI中存在两个可以用来观察分析自己Job运行情况的Tab,分别是Task level的运行状态图(图1)和SQL 的Physical Plan DAG(图2):

图1 Task Level

(点击可查看大图)

图2 Physical Plan DAG

(点击可查看大图)

Task level的Tab内容更加偏底层一些,我们可以获取到很多重要的数据信息,比如当前已处理的数据数量以及和JVM相关的重要数据。但问题是,在SQL特别复杂的时候很难将状态信息和自己的SQL的逻辑计划对应起来。SQL Tab 展示了物理执行计划图,更贴近用户的逻辑计划图,但是缺少Spark执行过程中偏底层的状态信息,比如task粒度的信息。如果发生了数据倾斜,无法直接在这张图上看出问题所在。

因此,在现有的Spark UI 布局下,对于用户来说,想要debug就必须在不同的图之间反复切换,同时还要结合Spark内核原理来分析状态信息从而诊断自己Job的问题所在,这对大部分数据分析师和产品经理来说具有一定的学习成本。而且即便发现了问题,这部分状态信息也不一定能给用户提供解决问题的具体方向,Spark当前所暴露出来的内部信息满足不了解决问题的需求。

4. Zeta 解决方案

4.1 核心问题

一句 SQL从解析到实际被Spark进程执行的过程中会历经如下几个阶段:

图3(点击可查看大图)

如图3所示,SQL最终会被转换成底层的RDD,整个任务执行的DAG图会被分解成一个或者多个具有依赖关系的stage并最终以task为执行单元发送到Spark Executor的进程中去执行。大部分情况下,到了这个阶段就已经无法再从task执行的上下文中找出这个task到底是在执行SQL上对应的哪块逻辑,因为经过Codegen等一系列优化之后理论上已经无法在实际的物理执行过程和最初的逻辑计划上建立映射关系。

对于数据倾斜这种情况,我们可以通过观察Spark UI来进行判定。如果某个stage执行了很长时间,其中少部分task处理的数据又比其他task多很多,那么就证明出现了数据倾斜。以多张表做join为例,如果在shuffle的过程中产生了数据倾斜,为了尽可能将数据分散到不同的进程中进行处理,从而达到平衡工作负载的目的,比较通用的有以下几个方法:修改逻辑,将shuffle时的key尽可能打散;加大shuffle的分区数量从而使数据分散到更多的分区中去;单独找出产生了极大倾斜的key,在逻辑中单独处理最后再和其他部分union起来。

在准备开始解决这个问题之前,我们必须要回答两个核心问题:

1)如何找出SQL逻辑中发生了倾斜的那个部分?

2)如果发生了倾斜,又该如何知道到底倾斜在了哪一些key上呢?

对于问题1),一般来说数据倾斜都发生在会产生shuffle的操作上,比如join和group by等操作。对Spark内核比较熟悉的用户可以根据Spark UI上DAG的实时计划图大致推断出对应在SQL上的逻辑操作。

而问题2)就需要用户自己花额外的功夫来对表的数据做统计分析,比如算出表中用来做join的字段中每个值的数量,并按照值的大小进行排序,由此可以观察出主要有哪些造成倾斜最严重的值,从而进行针对优化。

如果这两个问题Spark能够自动在执行任务的时候解决,并通过一张图的形式很直观地表达出来,然后在用户执行Spark SQL的过程中实时推送给用户,那就可以即时地帮助用户发现执行过程中存在的性能问题,也就能节省大量分析推断和数据调研的时间,大大提高开发效率。

4.2  定制化DAG图

经过内部用户的调研和分析,我们决定通过修改Spark源码来满足以上需求。在现有Spark UI的基础上构建一张能同时包含具有这两张图关键特性的DAG图,既要反映出实际Spark进程中的关键状态信息,又要尽可能地帮助用户,将出现问题的状态映射到SQL中的某个逻辑块上。

Spark SQL经过解析后的Physical Plan中,每个物理算子节点都实现了对RDD的转换过程。所以当最终SQL的执行过程经过一系列转换变成RDD的转换过程后,一个物理算子就可以映射到RDD DAG图中的某一段路径上,然后根据RDD DAG图划分stage的规则,从而将stage映射到SQL Physical plan的某个或者多个相邻节点上,再将Task level的状态映射到Physical Plan的节点上了。

Task level的状态信息非常丰富,包括输入数据的大小以及和shuffle相关的状态信息等,这些信息都是帮助判定是否在某个算子上发生了倾斜的重要线索。同时,SQL Physical Plan的物理算子中,有的算子恰好保留了和逻辑执行计划及对应操作相关的上下文,如SortMergeJoinExec、ShuffledHashJoinExec算子保留了join操作时用到的key的上下文。因此,这样一来就解决了4.1中的问题1)

对于问题2),因为要计算key的值就必须引入额外的计算,而在实际的计算中key的基数又是很大的,所以为了不对Job的整体性能造成过大的影响,我们只需要计算Top N的那几个key即可。从优化倾斜的角度来讲,我们往往只需要找到倾斜最严重的部分key就可以了,而且这些key应该也只是少数。

接下来需要考虑的,就是该如何插入这段计算逻辑,以及如何让这段逻辑覆盖大部分情况下的倾斜度计算。在Spark的shuffle write阶段,其实writer写的时候就已经遍历了每一条数据,但是这个阶段太抽象,想在该阶段对数据进行统计计算并还原成实际处理数据的key并不是很容易,需要做非常多的workaround,这样就不能直接地解决问题。

因此我们还是决定从物理算子入手,通过修改部分物理算子的算法,来达到在做原有计算逻辑的同时也对数据做统计计算。于是我们在这当中对少部分物理算子的算法做了较大的重构。由于做了重构设计,因此当动态计算key的功能启动之时,这少部分原本支持Codegen的物理算子将无法支持Codegen。

在实际的计算过程中,如果开启动态计算key的功能,将会为每一个TaskSet创建一个定制化的AdvancedTaskSetManager,主要作用一是执行原有物理算子的逻辑,二是当发现某些task存在数据倾斜的时候,会额外启动一个TaskSet’来执行统计计算的逻辑,如图4所示:

图4(点击可查看大图)

这个TaskSet’的执行逻辑和正常TaskSet的执行逻辑一样,都是修改过算法后的执行逻辑。不同的是AdvancedTaskSetManager会为这两个TaskSet分别注入不通的TaskContext,从而控制实际Task 在Runtime中执行不同的逻辑分支,一部分进行正常的计算,另外一部分进行统计计算并将结果返回到Driver端进行聚合,从而达到统计汇总的目的。这个改动对Spark原有代码有一定的侵入性。

好了,这下我们需要的运行时的数据都拿到了,接下来要做的就是构建这幅图了。Spark在Driver的初始化进程中会创建一个Spark UI对象,Spark UI会启动一个Jetty的web服务来供外部访问,Driver内部的状态存储对象AppStatusStore会为不同的Tab提供后端Render页面的数据,运行时Spark UI内部的状态如图5所示:

图5(点击可查看大图)

构建的第一步便是记录构建图形所需要的数据。Spark会在运行过程的某些逻辑中构建对应的事件,以便记录上下文并异步发送到Spark Driver内部的消息总线LiveListenerBus中。而且会有特定的Listener在总线的队列中监听特定的事件,当SQL被解析完毕并且准备开始执行的时候,会发出SparkListenerSQLExecutionStart事件。该事件中包含了SQL的物理计划执行图,像SparkPlanInfo,Driver内部的SQLAppStatusListener会监听这个事件并根据SparkPlanInfo准备将来做SQL Physical Plan后端渲染的数据。

因此,我们在SQL执行前物理计划树的遍历阶段记录下每个算子和对应RDD的上下文信息(图6),并以事件的形式发送到消息总线中,再由我们定制化的Listener监听捕捉并和已有的物理计划图进行整合即可(图7)。

图6(点击可查看大图)

图7(点击可查看大图)

而在每个stage开始执行和执行完毕的时候,也会发出相应的事件,这些事件中就包含了上文提到的各种统计信息和额外被注入的诊断信息。因此这些事件也会被监听,并用来update当前我们定制化的DAG图的状态。以一个实际生产中的案例作为参考,原始DAG图(图8)和经过定制化后的DAG图(图9)分别如下所示:

图8 社区版DAG图

(点击可查看大图)

图9 经过定制后的DAG图

(点击可查看大图)

此案例是在实际生产中两张表进行join的时候产生了数据倾斜,可见社区版原始的DAG图中只展示了少量的和内存以及数据量等相关的信息,而这些数据并不足以帮助我们观察出内在的性能问题。反观经过定制化的DAG图,可以看到在这个阶段运行过程中检测到了数据倾斜,并且显示倾斜发生在join操作上,还提示了join的字段为user_id,并同时计算出了倾斜最严重的值为1。而这个倾斜最严重的值在相关业务场景中其实是脏数据,于是用户根据这个信息更改了SQL,将user_id为1的数据filter掉了,从而大大减少了shuffle时候的数据量,大幅缩短了整体的运行时间,解决了内存溢出的问题。

5. 线上效果

5.1 减少事故发生率

在数据平台上上线这个定制化DAG图的Spark版本后,来自数据分析师和产品经理的support request变少了,同时许多数据开发人员在Job正式上线到生产环境之前,会通过数据平台运行生产中的Job并使用这个DAG图来分析执行过程中的性能问题,从而提前采取措施来对Job进行优化,减少了生产中的事故。

5.2 性能影响

需要注意的是,开启了自动计算倾斜 key值后,Job性能会有一定的下降,最坏的情况下整体Job的执行时间相较于不开启的时候多了30%左右,同时整体的CPU时间也提升了一些,性能影响最小的一个Job整体时间大概增加了5%。总体趋势是当Spark开启了DynamicAllocation之后,随着数据量的增大,对性能的影响会逐渐加强,这主要是由于额外的统计分析计算消耗了CPU时间,而且由于需要关闭特定算子的Codegen,所以相对于之前会产生更多的虚函数调用。此外,为了从Yarn申请更多的container来启动Executor运行额外的TaskSet,也增加了更多的调度时间。

6. 未来优化方向及社区跟进

当前DAG图的内部实现原理还有很多可以优化的地方,比如在计算极大倾斜的key的阶段,我们可以使用采样的方法来替代全体扫描。因为对于大部分情况来说,倾斜的key往往是少部分,采样虽然存在误差,但是基本能够找出发生了问题的key,这样可以节省不少CPU时间。同时也可以根据当前采集到的信息对SQL的物理执行过程进行动态优化,比如可以结合社区最新版本的AdaptiveExecution功能,做更加深入的动态调优从而提升整体的执行性能。在开发定制化DAG的时候Spark 3.0还没有正式发布,在3.0版本中社区对DAG图也做了更进一步的优化和增强,未来会考虑与其进行整合。

猜你喜欢

1、

2、

3、

4、

过往记忆大数据微信群,请添加微信:fangzhen0219,备注【进群】

转载地址:https://iteblog.blog.csdn.net/article/details/106654527 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:HBase 不停机升级在滴滴的实践
下一篇:当当年中庆典,力度超前,据说他花200买了10本书!

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年04月09日 02时53分13秒