java并行出来文件_将多个文件并行处理为独立的RDD
发布日期:2021-06-24 12:43:41 浏览次数:2 分类:技术文章

本文共 894 字,大约阅读时间需要 2 分钟。

这是一个想法而不是一个完整的解决方案,我还没有测试过它 .

您可以从将数据处理管道提取到函数中开始 .

def pipeline(f: String, n: Int) = {

sqlContext

.read

.format("com.databricks.spark.csv")

.option("header", "true")

.load(f)

.repartition(n)

.groupBy(...)

.agg(...)

.cache // Cache so we can force computation later

}

如果您的文件很小,您可以调整 n 参数以使用尽可能少的分区来适应单个文件中的数据并避免混乱 . 这意味着你限制了并发性,但我们稍后会回到这个问题 .

val n: Int = ???

接下来,您必须获取输入文件列表 . 此步骤取决于数据源,但大多数情况下它或多或少是直接的:

val files: Array[String] = ???

接下来,您可以使用 pipeline 函数映射上面的列表:

val rdds = files.map(f => pipeline(f, n))

由于我们通过提交多个作业来限制单个文件级别的并发性 . 让我们添加一个简单的帮助器,强制评估并用 Future 包裹它

import scala.concurrent._

import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {

df.rdd.foreach(_ => ()) // Force computation

df

}

最后我们可以在 rdds 上使用上面的帮助:

val result = Future.sequence(

rdds.map(rdd => pipelineToFuture(rdd)).toList

)

根据您的要求,您可以添加 onComplete 回调或使用反应流来收集结果 .

转载地址:https://blog.csdn.net/weixin_32820131/article/details/114758711 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:公共界面java_14web案例篇-企业项目页面公共模块的抽取方式
下一篇:java synchronized boolean_使用Boolean类型同步锁引起异常的分析

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月27日 17时33分21秒