大数据篇--Spark常见面试题总结一
发布日期:2021-06-29 17:21:13 浏览次数:2 分类:技术文章

本文共 24324 字,大约阅读时间需要 81 分钟。

文章目录

一、Spark 概念、模块

1.相关概念:

Application:Appliction都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。

Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,通常用SparkContext代表Driver。
Executor:某个Application运行在worker节点上的一个进程, 该进程负责运行某些Task, 并且负责将数据存到内存或磁盘上,每个Application都有各自独立的一批Executor, 在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象, 负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task,这个每一个oarseGrainedExecutor Backend能并行运行Task的数量取决与分配给它的cpu个数。
Cluter Manager:指的是在集群上获取资源的外部服务,目前有三种类型。Standalon:spark原生的资源管理,由Master负责资源的分配。Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架。Hadoop Yarn:主要是指Yarn中的ResourceManager。
Worker:集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点。
Task:被送到某个Executor上的工作单元,但hadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责。
Job:包含多个Task组成的并行计算,往往由Spark Action触发生成, 一个Application中往往会产生多个Job。
Stage:每个Job会被拆分成多组Task, 作为一个TaskSet, 其名称为Stage,Stage的划分和调度是有DAGScheduler来负责的,Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方。
DAGScheduler:根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法。
TASKSedulter:将TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的. TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。在不同运行模式中任务调度器具体为:Spark on Standalone模式为TaskScheduler;YARN-Client模式为YarnClientClusterScheduler;YARN-Cluster模式为YarnClusterScheduler。

  将这些术语串起来的运行层次图如下:Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency。

在这里插入图片描述
附:

2.基本模块:

整个 Spark 主要由以下模块组成:

  • Spark Core:Spark的核心功能实现,包括:
    • 基础设施:Spark 中有很多基础设施,被 Spark 中的各种组件广泛使用,这些基础设施包括 SparkConf、Spark 内置 RPC 框架、事件总线 ListenerBus、度量系统。
    • SparkContext:SparkContext 是 Spark 的入口,Spark 程序的提交与执行离不开 SparkContext。它隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI 等内容,开发者只需要使用 SparkContext 提供的 API 完成功能开发。
    • SparkEnv:Spark 执行环境。SparkEnv 内部封装了 RPC 环境(RpcEnv)、序列化管理器、广播管理器(BroadcastManager)、map任务输出跟踪器(MapOutputTracker)、存储体系、度量系统(MetricsSystem)、输出提交协调器(OutputCommitCoordinator)等Task运行所需的各种组件。
    • 存储体系:它优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘 I/O,提升了任务执行的效率,使得 Spark 适用于实时计算、迭代计算、流式计算等场景。Spark 的内存存储空间和执行存储空间之间的边界是可以控制的。
    • 调度系统:调度系统主要由 DAGScheduler 和 TaskScheduler 组成。DAGScheduler 负责创建 Job、将 DAG 中的 RDD 划分到不同的 Stage、给 Stage 创建对应的 Task、批量提交 Task 等功能。TaskScheduler 负责按照 FIFO 或者 FAIR 等调度算法对批量 Task 进行调度。
    • 计算引擎等:计算引擎由内存管理器、任务内存管理器、Task、Shuffle 管理器等组成。
  • Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询。此外,还为熟悉 Hive 开发的用户提供了对 Hive SQL 的支持。
  • Spark Streaming:提供流式计算处理能力,目前支持 Apache Kafka、Apache Flume、Amazon Kinesis 和简单的 TCP 套接字等多种数据源。此外,Spark Streaming 还提供窗口操作用于对一定周期内的流数据进行处理。
  • GraphX:提供图计算处理能力,支持分布式,Pregel 提供的 API 可以解决图计算中的常见问题。
  • MLlib:Spark 提供的机器学习库。MLlib 提供了机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的 API 接口大大降低了用户的学习成本。

Spark SQL、Spark Streaming、GraphX、MLlib的能力都是建立在Spark-core核心模块之上:

在这里插入图片描述

二、Spark作业提交流程是怎么样的

  1. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。
  2. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
  3. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。
  4. Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
  5. 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
  6. DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。
  7. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
  8. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)。

三、Spark on YARN两种方式的区别以及工作流程

参考官网:

1.Yarn组件简介:

  • ResourceManager:负责整个集群的资源管理和资源分配
  • NodeManager:每个节点的资源和任务的管理器,负责启动和停止Container,并监视资源使用情况
  • ApplicationMaster:Yarn中每个Application对应一个AM进程,获取资源后告诉NodeManager为其分配并启动Container
  • Container:Yarn中的抽象资源,它封装了某个节点上一定量的资源(CPU和内存两类资源),用来启动运行task。
    在这里插入图片描述
    YARN中的应用程序的提交处理,可以依次分为以下步骤:
    (1)用户通过client向RM提交应用程序
    (2)RM在接收到一个新的应用程序后,会先选择一个container用于启动该应用程序特有的AM
    (3)AM启动后,向RM请求运行应用程序所需要的资源
    (4)RM会尽可能地分配AM所请求的资源的容器,表达为container容器ID和主机名
    (5)AM根据给定的容器ID和主机名,要求对应的NodeManager使用这些资源启动一个特定于应用程序的任务。
    (6)NodeManager启动任务,并监视该任务使用资源的健康状况。
    (7)AM持续监视任务的执行情况。
    (8)当任务执行完成时,AM会向RM汇报并注销执行任务的容器,以及注销自己。

2.Spark On Yarn的优势:

在这里插入图片描述

  每个Spark executor作为一个YARN容器(container)运行。Spark可以使得多个Tasks在同一个容器(container)里面运行。

  • Spark支持资源动态共享,运行于Yarn的框架都共享一个集中配置好的资源池。
  • 可以很方便的利用Yarn的资源调度特性来做分类、隔离以及优先级控制负载,拥有更灵活的调度策略。
  • Yarn可以自由地选择executor数量。
  • Yarn是唯一支持Spark安全的集群管理器,使用Yarn,Spark可以运行于Kerberized Hadoop之上,在它们进程之间进行安全认证。

3.Spark on yarn cluster 模式:

  应用的运行结果不能在客户端显示(可以在history server中查看),所以最好将结果保存在HDFS而非stdout输出,客户端的终端显示的是作为YARN的job的简单运行状况,下图是yarn-cluster模式

在这里插入图片描述

  • Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等。
  • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化。
  • ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束。
  • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,而Executor对象的创建及维护是由CoarseGrainedExecutorBackend负责的,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等。
  • ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
  • 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

4.Spark on yarn client 模式:

  在Yarn-client中,Driver运行在Client上,通过ApplicationMaster向RM获取资源。本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。结束掉终端,相当于kill掉这个spark应用。因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是http://hadoop1:4040访问,而YARN通过http:// hadoop1:8088访问

在这里插入图片描述

  • Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend。
  • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派。
  • Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container)。
  • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task。
  • client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
  • 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。

  因为是与Client端通信,所以Client不能关闭。客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显 示,Driver以进程名为SparkSubmit的形式存在。

参考:

5.这两种模式的区别:

  • SparkContext初始化不同,这也导致了Driver所在位置的不同,Yarn-Cluster的Driver是在集群的某一台NM(NodeManager)上,Yarn-Client的Driver运行在客户端(就是在ResourceManager的机器上)。
  • 而Driver会和Executors进行通信,这也导致了Yarn-Cluster在提交App之后可以关闭Client,而Yarn-Client不可以。
  • 最后再来说应用场景,yarn-client用于测试,因为driver运行在本地客户端,负责调度application,会与yarn集群产生超大量的网络通信,从而导致网卡流量激增,增加网络负荷,好处在于,直接执行时,本地可以看到所有的log,方便调试;yarn-cluser,用于生产环境,因为dirver运行在nodemanager,没有网卡流量激增问题,缺点在于,调试不方便,本地用spark-submit提交后,看不到log,只能通过yarn appication logs applicaiton_id这种命令来查看,很麻烦。
    在这里插入图片描述

四、Spark内存管理

  我们都知道 Spark 能够有效的利用内存并进行分布式计算,其内存管理模块在整个系统中扮演着非常重要的角色。为了更好地利用 Spark,深入地理解其内存管理模型具有非常重要的意义,这有助于我们对 Spark 进行更好的调优;在出现各种内存问题时,能够摸清头脑,找到哪块内存区域出现问题。

  在执行Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能。由于 Driver 的内存管理相对来说较为简单,下文介绍的内存模型全部指 Executor 端的内存模型。

  作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

1.堆内内存(On-heap Memory):

默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:

Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据;
Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息;
预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。

整个 Executor 端堆内内存如果用图来表示的话,可以概括如下:

在这里插入图片描述
  堆内内存有300M的保留资源,此外的可用内存usableMemory被分为spark管理的内存和用户管理的内存两部分,spark管理的内存通过spark.memory.fraction进行控制,默认0.6。

systemMemory = Runtime.getRuntime.maxMemory,其实就是通过参数 spark.executor.memory--executor-memory 配置的。

reservedMemory 在 Spark 2.2.1 中是写死的,其值等于 300MB,这个值是不能修改的(如果在测试环境下,我们可以通过 spark.testing.reservedMemory 参数进行修改);

object UnifiedMemoryManager {
// Set aside a fixed amount of memory for non-storage, non-execution purposes. // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default. private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

usableMemory = systemMemory - reservedMemory,这个就是 Spark 可用内存;

在这里插入图片描述

2.堆外内存(Off-heap Memory):

  Spark 1.6 开始引入了Off-heap memory(详见)。这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存,由于这种方式不进过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。

  默认情况下,堆外内存是关闭的,我们可以通过 spark.memory.offHeap.enabled 参数启用,并且通过 spark.memory.offHeap.size 设置堆外内存大小,单位为字节。如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。相比堆内内存,堆外内存只区分 Execution 内存和 Storage 内存,其内存分布如下图所示:

在这里插入图片描述
上图中的 maxOffHeapMemory 等于 spark.memory.offHeap.size 参数配置的。

3.Execution 内存和 Storage 内存动态调整:

  在 Spark 1.5 之前,Execution 内存和 Storage 内存分配是静态的,换句话说就是如果 Execution 内存不足,即使 Storage 内存有很大空闲程序也是无法利用到的;反之亦然。这就导致我们很难进行内存的调优工作,我们必须非常清楚地了解 Execution 和 Storage 两块区域的内存分布。而目前 Execution 内存和 Storage 内存可以互相共享的。也就是说,如果 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 可以从 Storage 中申请空间;反之亦然。所以上图中的虚线代表 Execution 内存和 Storage 内存是可以随着运作动态调整的,这样可以有效地利用内存资源。Execution 内存和 Storage 内存之间的动态调整可以概括如下:

在这里插入图片描述

  • 程序提交的时候我们都会设定基本的 Execution 内存和 Storage 内存区域(通过 spark.memory.storageFraction 参数设置);
  • 在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
  • Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。

注意:上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

4.内存管理接口:

  Spark 为存储内存和执行内存的管理提供了统一的接口——MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存::在调用这些方法时都需要指定其内存模式(MemoryMode),这个参数决定了是在堆内还是堆外完成这次操作。

  MemoryManager 的具体实现上,Spark 1.6 之后默认为统一管理()方式,1.6 之前采用的静态管理()方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。

5.Task 之间内存分布:

  为了更好地使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存。当 Task 需要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用情况,如果没有,则将这个 Task 内存使用置为0,并且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。之后为这个 Task 申请 numBytes 内存,如果 Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,然后返回;如果当前 Execution 内存区域无法申请到每个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。每个 Task 可以使用 Execution 内存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。一个 Task 能够运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可以使用全部的 Execution 内存。

  比如如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 可以申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

参考:

6.存储内存管理:

参考第三大节的第6小节

7.执行内存管理:

参考:

(1)多任务间内存分配:

  Executor 内运行的任务同样共享执行内存,Spark 用一个 HashMap 结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的任务的个数。每个任务在启动之时,要向 MemoryManager 请求申请最少为 1/2N 的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。

(2)Shuffle 的内存占用:

  执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程,我们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的使用:

Shuffle Write:

  • 若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
  • 若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。

Shuffle Read:

  • 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
  • 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。

  在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

  Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划,解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序。Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是这个内存块在系统内存中的 64 位绝对地址。Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。

Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成:

  • 页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。
  • 页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。

  有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和 CPU 使用效率带来了明显的提升。

  Spark 的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。

五、RDD

参考官网:

  在实际开发应用中,存在许多迭代式计算,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。以前常用的MapReduce框架是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。如果有一种方法,能将结果保存在内存当中,就可以大量减少IO消耗。RDD一种弹性分布数据集,就是为了满足这种需求而实现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理。不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销。

  RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、只读的、被分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

1.RDD概念:

  org.apache.spark.rdd.RDD 类源代码中有详细的注释:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

翻译:弹性的 分布式 数据集是 Spark 基础的抽象。
解释:弹性的(可复原的),说明数据集具有容错性、可修复性。 分布式,说明数据集可以分布在不同的机器上。

Represents an immutable, partitioned collection of elements that can be operated on in parallel.

翻译:RDD 是不可变的 分区的 可并行处理的 元素集合。
解释:不可变的,这和 Scala 的设计理念相同,数据集一旦构建完成,就不能再修改,这样能轻松解决多个线程读数据的一致性问题。 分区的=可并行处理的=分布式。

This class contains the basic operations available on all RDDs, such as map, filter, and persist.

翻译:这个抽象类包含了所有 RDD 都应该有的基本操作,比如 map 、filter 、persist等。
解释:这三个操作分别是:批量转换、筛选、持久化。

In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as groupByKey and join;

翻译:另外 PairRDDFunctions 对象中包含了 键值对型(KV型) RDD 的操作,例如 groupByKey和 join;
解释:KV 型可以支持按 Key 分组、关联等操作。

[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles;

翻译:DoubleRDDFunctions提供可 double 数据集的操作;
解释:数值型数据集有求和、平均、分布图等统计性操作。

and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.

翻译:SequenceFileRDDFunctions 提供了顺序存储操作。

All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.

翻译:所有的的类通过隐式转换自动地用于RDD实例中
解释:RDD 伴生对象里包含了隐式转换函数,用implicit 修饰。隐式转换是 Scala 的语法特性。

Internally, each RDD is characterized by five main properties:

翻译:在 RDD 中,包含这样的5个属性(也就说要实现抽象方法或给空对象赋值):

  • A list of partitions(翻译:一个分区的列表(getPartitions))
    解释:它是一组分区,分区是spark中数据集的最小单位。也就是说spark当中数据是以分区为单位存储的,不同的分区被存储在不同的节点上。这也是分布式计算的基础。
  • A function for computing each split(翻译:一个用于计算分区中数据的函数(compute))
    解释:一个应用在各个分区上的计算任务。在spark当中数据和执行的操作是分开的,并且spark基于懒计算的机制,也就是在真正触发计算的行动操作出现之前,spark会存储起来对哪些数据执行哪些计算。数据和计算之间的映射关系就存储在RDD中。
  • A list of dependencies on other RDDs(翻译:一个对其他 RDD 的依赖列表(getDependencies))
    解释:RDD之间的依赖关系,RDD之间存在转化关系,一个RDD可以通过转化操作转化成其他RDD,这些转化操作都会被记录下来。当部分数据丢失的时候,spark可以通过记录的依赖关系重新计算丢失部分的数据,而不是重新计算所有数据。
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(翻译:可选:KV 型 RDD 应该有一个分区器,例如 hash-分区器(partitioner))
    解释:一个分区的方法,也就是计算分区的函数。spark当中支持基于hash的hash分区方法和基于范围的range分区方法。
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)(翻译:可选:分区数据计算完后优先存储的位置,例如 HDFS 的某个块(getPreferredLocations))
    解释:一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。

  通过以上五点,我们可以看出spark一个重要的理念。即移动数据不如移动计算,也就是说在spark运行调度的时候,会倾向于将计算分发到节点,而不是将节点的数据搜集起来计算。RDD正是基于这一理念而生的,它做的也正是这样的事情。

2.创建RDD:

  spark中提供了两种方式来创建RDD,一种是读取外部的数据集,另一种是将一个已经存储在内存当中的集合进行并行化。其他方式还有:读取数据库等等其他的操作。也可以生成RDD;RDD可以通过其他的RDD转换而来的。

  最简单的方式当然是并行化,因为这不需要外部的数据集,可以很轻易地做到。在此之前,我们先来看一下SparkContext的概念,SparkContext是整个spark的入口,相当于程序的main函数。在我们启动spark的时候,spark已经为我们创建好了一个SparkContext的实例,命名为sc,我们可以直接访问到。

  我们要创建RDD也需要基于sc进行,比如下面我要创建一个有字符串构成的RDD:

texts = sc.parallelize(['now test', 'spark rdd'])

  除了parallelize之外呢,我们还可以由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,比如我想从一个文件读入,可以使用sc当中的textFile方法获取:

在这里插入图片描述
  一般来说,除了本地调试我们很少会用parallelize进行创建RDD,该方法可以方便我们在spark-shell中快速创建出需要的RDD,因此在我们学习Spark时经常用到,但在实际应用场景中,这种方法使用的极少。

3.转化操作和行动操作:

  RDD支持两种操作,一种叫做转化操作(Transformation)一种叫做行动操作(Action)。

  主要做的是就是将一个已有的RDD生成另外一个RDD。Transformation具有lazy特性(延迟加载)。Transformation算子的代码不会真正被执行。只有当我们的程序里面遇到一个action算子的时候,代码才会真正的被执行。这种设计让Spark更加有效率地运行。

常用的Transformation:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numPartitions]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作
sortByKey([ascending], [numPartitions]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活 第一个参数是根据什么排序 第二个是怎么排序 false倒序 第三个排序后分区数 默认与原RDD一样
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD 相当于内连接(求交集)
cogroup(otherDataset, [numPartitions]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 两个RDD的笛卡尔积 的成很多个K/V
pipe(command, [envVars]) 调用外部程序
coalesce(numPartitions) 重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 false
repartition(numPartitions) 重新分区 必须shuffle 参数是要分多少区 少变多
repartitionAndSortWithinPartitions(partitioner) 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作
foldByKey(zeroValue)(seqOp) 该函数用于K/V做折叠,合并处理 ,与aggregate类似 第一个括号的参数应用于每个V值 第二括号函数是聚合例如:+
combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
partitionBy(partitioner) 对RDD进行分区 partitioner是分区器 例如new HashPartition(2)
cache/persist RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY ,而persist则可以选择缓存级别

  触发代码的运行,我们一段spark代码里面至少需要有一个action操作。常用的Action:

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement, num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) (Java and Scala) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
aggregate 先对分区进行操作,在总体操作

4.RDD的宽依赖和窄依赖:

参考:

5.RDD的执行过程:

参考:

  1. RDD读入外部数据源(或者内存中的数据集)进行创建;注意:RDD读取数据时,一般默认2个分区。
  2. RDD经过一系列的“转换”操作。每一次都会产生不同的RDD,供给下一个“转换”使用;
  3. 最后一个RDD经“行动”操作进行处理,并输出到外部数据源,或者变成Scala/Java集合或变量

  值得注意的是,RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

RDD血缘依赖转换流程:

在这里插入图片描述
  从数据输入,到逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F,也是一个RDD。之所以说是逻辑上,是因为这时候计算并没有发生,只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“行动”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。

  这一处理过程:称为一个“血缘关系(Lineage)”,即DAG拓扑排序的结果。

  Spark采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据。因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。

  同时。这种通过血缘关系就是把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。

6.存储内存管理:

(1)RDD 的持久化机制:

  弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换(Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时,Spark 才会创建任务读取 RDD,然后真正触发转换的执行。

  Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理 (存储内存的其他应用场景,如缓存 broadcast 数据,暂时不在本文的讨论范围之内)。

  RDD 的持久化由 Spark 的 Storage 模块 负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。

Storage 模块示意图:

在这里插入图片描述
  在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的存储级别(可参考),而存储级别是以下 5 个变量的组合:

// 存储级别:class StorageLevel private(	private var _useDisk: Boolean, //磁盘	private var _useMemory: Boolean, //这里其实是指堆内内存	private var _useOffHeap: Boolean, //堆外内存	private var _deserialized: Boolean, //是否为非序列化	private var _replication: Int = 1 //副本个数)

通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:

  • 存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。
  • 存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。
  • 副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本。
(2)RDD 缓存的过程:

  RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器()的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不连续。

  RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。

  因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图Spark Unroll示意图所示:

在这里插入图片描述
  在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

(3)淘汰和落盘:

  由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。

存储内存的淘汰规则为:

  • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
  • 新旧 Block 不能属于同一个 RDD,避免循环淘汰
  • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
  • 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。

  落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

参考:

转载地址:https://blog.csdn.net/m0_37739193/article/details/117464632 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:大数据篇--Spark常见面试题总结二
下一篇:大数据篇--数据倾斜

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年04月20日 13时55分29秒