Apache Spark 3.0 中的向量化 IO
发布日期:2021-06-30 11:28:39 浏览次数:2 分类:技术文章

本文共 2757 字,大约阅读时间需要 9 分钟。

R 是数据科学中最流行的计算机语言之一,专门用于统计分析和一些扩展,如用于数据处理和机器学习任务的 RStudio addins 和其他 R 包。此外,它使数据科学家能够轻松地可视化他们的数据集。

通过在 Apache Spark 中使用 SparkR,可以很容易地扩展 R 代码。要交互式地运行作业,可以通过运行 R shell 轻松地在分布式集群中运行 R 的作业。

当 SparkR 不需要与 R 进程交互时,其性能实际上与 Scala、Java 和 Python 等其他语言 API 相同。但是,当 SparkR 作业与本机 R 函数或数据类型交互时,会性能显著下降。

如果在 Spark 和 R 之间使用 Apache Arrow 来进行数据交换,其性能会有很大的提升。这篇博客文章概述了 SparkR 中 Spark 和 R 的交互,并对比了没有向量化执行和有向量化执行的性能差异。

Spark 和 R 交互

SparkR 不仅支持丰富的 ML 和类似 SQL 的 API 集合,而且还支持用于直接与 R 代码进行交互的一组 API。例如,Spark DataFrame 和 R DataFrame 之间的无缝转换以及在 Spark DataFrame 上以分布式的方式执行 R 内置函数。

在大多数情况下,Spark 中的其他语言 API 之间的性能实际上是一致的——例如,当用户代码依赖于 Spark UDF 或者 SQL API 时,执行过程完全在 JVM 中进行, I/O 方面没有任何性能损失。比如下面的两种调用时间都只需要一秒:

// Scala API// ~1 secondsql("SELECT id FROM range(2000000000)").filter("id > 10").count()# R API# ~1 secondcount(filter(sql("SELECT * FROM range(2000000000)"), "id > 10"))

但是,在需要执行 R 的内置函数或将其从 R 内置类型转换到其他语言类型的情况下,其性能将有很大不同,如下所示。

// Scala APIval ds = (1L to 100000L).toDS// ~1 secondds.mapPartitions(iter => iter.filter(_ < 50000)).count()# R APIdf <- createDataFrame(lapply(seq(100000), function (e) list(value=e)))# ~15 seconds - 15 times slowercount(dapply(df, function(x) as.data.frame(x[x$value < 50000,]), schema(df)))

上面其实仅仅是对每个分区中过滤出小于 50000 的数据,然后对其进行 count 操作,但是 SparkR 却比 Scala 编写的代码慢 15 倍!

// Scala API// ~0.2 secondsval df = sql("SELECT * FROM range(1000000)").collect()# R API# ~8 seconds - 40 times slowerdf <- collect(sql("SELECT * FROM range(1000000)"))

上面这个例子情况更糟糕,其仅仅是将数据收集到 Driver 端,但是 SparkR 比 Scala 要慢 40 倍!

这是因为上面计算需要与 R 内置函数或数据类型交互的 API ,但是其实现效率不高。在 SparkR 中类似的函数还有六个:

createDataFrame()collect()dapply()dapplyCollect()gapply()gapplyCollect()

简单来说,createDataFrame() 和 collect() 需要在 JVM 和 R 之间进行序列化/反序列化,并且对数据进行转换,比如 Java 中的字符串需要转换成 R 中的 character。

原始实现(Native implementation)

上图中 SparkR DataFrame 的计算是分布在 Spark 集群上所有可用的节点上。如果不需要将数据以 R 的 data.frame 进行收集(collect)或不需要执行 R 内置函数,则在 Driver 或 executor 端不需要与 R 进程进行通信。但是当它需要使用 R 的 data.frame 或使用 R 的内置函数时,需要 Driver 或 executor 使用 sockets 使得 JVM 和 R 进行通信。

这需要在 JVM 和 R 直接对交换的数据进行序列化和反序列化操作,而这个操作的编码格式非常低效,完全没有考虑到现代 CPU 的设计,比如 CPU pipelining。

向量化执行(Vectorized implementation)

在 Apache Spark 3.0 中,SparkR 中引入了一种新的向量化(vectorized)实现,它利用 Apache Arrow 直接在 JVM 和 R 之间交换数据,且(反)序列化成本非常小,具体如下:

 新的实现方式并没有在 JVM 和 R 之间使用低效的格式对数据逐行进行(反)序列化,而是利用 Apache Arrow 以高效的列格式进行流水线处理和单指令多数据(SIMD)。

新的矢量化 SparkR API 默认情况下未启用,但可以通过在 Apache Spark 3.0 中将 spark.sql.execution.arrow.sparkr.enabled 设置为 true 来启用。注意,dapplyCollect() 和 gapplyCollect() 矢量化操作尚未实现。建议使用 dapply() 和 gapply() 来替代。

基准测试结果

下面的基准测试使用的数据集为 500,000 条记录。分别测试使用和未使用矢量化的执行时间:

使用矢量化优化之后,collect() 和 createDataFrame() 性能分别大致提升 17 倍和 42x 倍;而对 dapply() 和 gapply(), 分别提升了43x 和 33x 。

从上面的启发可以看到,如果我们需要在不同系统之间进行数据交互,也可以使用 Apache Arrow。

猜你喜欢

1、

2、

3、

4、

过往记忆大数据微信群,请添加微信:fangzhen0219,备注【进群】

转载地址:https://iteblog.blog.csdn.net/article/details/107273924 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Apache Flink 1.11.0 重要功能全面解析
下一篇:Spark 3.0 中七个必须知道的 SQL 性能优化

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年05月04日 05时14分39秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

rnn 梯度消失爆炸 2019-04-30
NLP jieba分词源码解析 2019-04-30
leetcode 3 2019-04-30
linux man 2019-04-30
linux 命令综述 2019-04-30
linux 网络命令 2019-04-30
leetcode 94 2019-04-30
mit 6.824 2019-04-30
linux iconv 2019-04-30
python 读取文件 2019-04-30
linux nc 2019-04-30
statistics conjugate 2019-04-30
ctr平滑 2019-04-30
Hadoop原理 2019-04-30
spark 数据存储 2019-04-30
矩阵分解 2019-04-30
fm ffm模型 2019-04-30
hive udf 2019-04-30
java修饰符 2019-04-30
redis 2019-04-30