本文共 894 字,大约阅读时间需要 2 分钟。
这是一个想法而不是一个完整的解决方案,我还没有测试过它 .
您可以从将数据处理管道提取到函数中开始 .
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
如果您的文件很小,您可以调整 n 参数以使用尽可能少的分区来适应单个文件中的数据并避免混乱 . 这意味着你限制了并发性,但我们稍后会回到这个问题 .
val n: Int = ???
接下来,您必须获取输入文件列表 . 此步骤取决于数据源,但大多数情况下它或多或少是直接的:
val files: Array[String] = ???
接下来,您可以使用 pipeline 函数映射上面的列表:
val rdds = files.map(f => pipeline(f, n))
由于我们通过提交多个作业来限制单个文件级别的并发性 . 让我们添加一个简单的帮助器,强制评估并用 Future 包裹它
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
最后我们可以在 rdds 上使用上面的帮助:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
根据您的要求,您可以添加 onComplete 回调或使用反应流来收集结果 .
转载地址:https://blog.csdn.net/weixin_32820131/article/details/114758711 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!