sparkcore分区_7.spark core之数据分区
发布日期:2021-10-31 09:14:58 浏览次数:2 分类:技术文章

本文共 8028 字,大约阅读时间需要 26 分钟。

简介

spark一个最重要的特性就是对数据集在各个节点的分区进行控制。控制数据分布可以减少网络开销,极大地提升整体性能。

只有Pair RDD才有分区,非Pair RDD分区的值是None。如果RDD只被扫描一次,没必要预先分区处理;如果RDD多次在诸如连接这种基于键的操作中使用时,分区才有作用。

分区器

分区器决定了RDD的分区个数及每条数据最终属于哪个分区。

spark提供了两个分区器:HashPartitioner和RangePartitioner,它们都继承于org.apache.spark.Partitioner类并实现三个方法。numPartitions: Int: 指定分区数

getPartition(key: Any): Int: 分区编号(0~numPartitions-1)

equals(): 检查分区器对象是否和其他分区器实例相同,判断两个RDD分区方式是否一样。

HashPartitioner分区

HashPartitioner分区执行原理:对于给定的key,计算其hashCode,再除以分区数取余,最后的值就是这个key所属的分区ID。实现如下:class HashPartitioner(partitions: Int) extends Partitioner {

require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {    case null => 0

case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

}

override def equals(other: Any): Boolean = other match {    case h: HashPartitioner =>

h.numPartitions == numPartitions    case _ =>      false

}

override def hashCode: Int = numPartitions}

RangePartitioner分区

HashPartitioner分区可能导致每个分区中数据量的不均匀。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。

RangePartitioner分区执行原理:计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量。

根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePrePartition

调用RangePartitioner的sketch函数进行数据抽样,计算出每个分区的样本。

计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜。

对于数据量比较多的RDD分区调用RDD的sample函数API重新进行数据抽取。

将最终的样本数据通过RangePartitoner的determineBounds函数进行数据排序分配,计算出rangeBounds。class RangePartitioner[K: Ordering : ClassTag, V](

partitions: Int,

rdd: RDD[_ <:>

extends Partitioner {  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.

require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")  // 获取RDD中K类型数据的排序器

private var ordering = implicitly[Ordering[K]]  // An array of upper bounds for the first (partitions - 1) partitions

private var rangeBounds: Array[K] = {    if (partitions <= 1) {      // 如果给定的分区数小于等于1的情况下,直接返回一个空的集合,表示数据不进行分区

Array.empty

} else {      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.

// 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据

val sampleSize = math.min(20.0 * partitions, 1e6)      // Assume the input partitions are roughly balanced and over-sample a little bit.

// RDD各分区中的数据量可能会出现倾斜的情况,乘于3的目的就是保证数据量小的分区能够采样到足够的数据,而对于数据量大的分区会进行第二次采样

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt      // 从rdd中抽取数据,返回值:(总rdd数据量, Array[分区id,当前分区的数据量,当前分区抽取的数据])

val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)      if (numItems == 0L) {        // 如果总的数据量为0(RDD为空),那么直接返回一个空的数组

Array.empty

} else {        // If a partition contains much more than the average number of items, we re-sample from it

// to ensure that enough items are collected from that partition.

// 计算总样本数量和总记录数的占比,占比最大为1.0

val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)        // 保存样本数据的集合buffer

val candidates = ArrayBuffer.empty[(K, Float)]        // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)

val imbalancedPartitions = mutable.Set.empty[Int]        // 计算抽取出来的样本数据

sketched.foreach { case (idx, n, sample) =>          if (fraction * n > sampleSizePerPartition) {            // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取

imbalancedPartitions += idx

} else {            // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中

// The weight is 1 over the sampling probability.

val weight = (n.toDouble / sample.size).toFloat            for (key 

candidates += ((key, weight))

}

}

}        // 对于数据分布不均衡的RDD分区,重新进行数据抽样

if (imbalancedPartitions.nonEmpty) {          // Re-sample imbalanced partitions with the desired sampling probability.

// 获取数据分布不均衡的RDD分区,并构成RDD

val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)          // 随机种子

val seed = byteswap32(-rdd.id - 1)          // 利用rdd的sample抽样函数API进行数据抽样

val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()

val weight = (1.0 / fraction).toFloat

candidates ++= reSampled.map(x => (x, weight))

}        // 将最终的抽样数据计算出rangeBounds出来

RangePartitioner.determineBounds(candidates, partitions)

}

}

}  // 下一个RDD的分区数量是rangeBounds数组中元素数量+ 1个

def numPartitions: Int = rangeBounds.length + 1

// 二分查找器,内部使用java中的Arrays类提供的二分查找方法

private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]  // 根据RDD的key值返回对应的分区id。从0开始

def getPartition(key: Any): Int = {    // 强制转换key类型为RDD中原本的数据类型

val k = key.asInstanceOf[K]

var partition = 0

if (rangeBounds.length <= 128) {      // If we have less than 128 partitions naive search

// 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标

while (partition 

partition += 1

}

} else {      // Determine which binary search method to use only once.

// 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标;

// 但是如果k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比所有数据都大)

partition = binarySearch(rangeBounds, k)      // binarySearch either returns the match location or -[insertion point]-1

if (partition 

partition = -partition - 1

}      if (partition > rangeBounds.length) {

partition = rangeBounds.length

}

}    // 根据数据排序是升序还是降序进行数据的排列,默认为升序

if (ascending) {

partition

} else {

rangeBounds.length - partition

}

}

影响分区的算子操作

影响分区的算子操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(如果父RDD有分区方式)、flatMapValues()(如果父RDD有分区方式)。

对于执行两个RDD的算子操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中一个父RDD设置过分区方式,结果就采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。

repartition和partitionBy的区别

repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner。但是二者之间的区别有:partitionBy只能用于Pair RDD

都作用于Pair RDD时,结果也不一样

repartition和partitionBy的区别.jpg

其实partitionBy的结果才是我们所预期的。repartition 其实使用了一个随机生成的数来当作 key,而不是使用原来的key。def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {

coalesce(numPartitions, shuffle = true)

}

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)

: RDD[T] = withScope {    if (shuffle) {      /** Distributes elements evenly across output partitions, starting from a random partition. */

val distributePartition = (index: Int, items: Iterator[T]) => {        var position = (new Random(index)).nextInt(numPartitions)

items.map { t =>          // Note that the hash code of the key will just be the key itself. The HashPartitioner

// will mod it with the number of total partitions.

position = position + 1

(position, t)

}

} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed

new CoalescedRDD(        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),        new HashPartitioner(numPartitions)),

numPartitions).values

} else {      new CoalescedRDD(this, numPartitions)

}

}

repartition和coalesce的区别

两个算子都是对RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)N

如果N>M并且N和M相差不多(假如N是1000,M是100),这时可以将shuffle设置为false,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。在shuffle为false的情况下,如果N

如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

实例分析

需求

统计用户访问其未订阅主题页面的情况。用户信息表:由(UserID,UserInfo)组成的RDD,UserInfo包含该用户所订阅的主题列表。

事件表:由(UserID,LinkInfo)组成的RDD,存放着每五分钟内网站各用户访问情况。

代码实现val sc = new SparkContext()

val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist

def processNewLogs(logFileName:String){

val events = sc.sequenceFile[UserID, LinkInfo](logFileName)    //RDD of (UserID,(UserInfo,LinkInfo)) pairs

val joined = usersData.join(events)

val offTopicVisits = joined.filter {        // Expand the tuple into its components

case (userId, (userInfo, linkInfo)) =>

!userInfo.topics.contains(linkInfo.topic)

}.count()

println("Number of visits to non-subscribed opics: " + offTopicVisits)

}

缺点

连接操作会将两个数据集中的所有键的哈希值都求出来,将哈希值相同的记录通过网络传到同一台机器上,然后再对所有键相同的记录进行连接操作。userData表数据量很大,所以这样进行哈希计算和跨节点数据混洗非常耗时。

改进代码实现val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")

.partionBy(new HashPartiotioner(100))

.persist()

优点

userData表进行了重新分区,将键相同的数据都放在一个分区中。然后调用persist持久化结果数据,不用每次都计算哈希和跨节点混洗。程序运行速度显著提升。

作者:java大数据编程

链接:https://www.jianshu.com/p/f41afdd22c43

转载地址:https://blog.csdn.net/weixin_39945523/article/details/111730786 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:1 数列分块入门_「分块系列」数列分块入门1 解题报告
下一篇:1字符集 iso latin_附件一、ISO Latin-1字符集

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年03月06日 10时02分40秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

java 获取 html 图片路径_JAVA-替换html中图片的路径-从html代码中提取图片路径并下载(完整版)... 2019-04-21
java redis 面试题_Java面试题(Redis篇) 2019-04-21
java 正则表达式分类功能_JAVA正则表达式4种常用功能 2019-04-21
java3d立方体_3d立方体贴图 2019-04-21
java ajax教程_(转)JAVA AJAX教程第三章—AJAX详细讲解 2019-04-21
java operators_A guide to Java Operators 2019-04-21
java socket调试_JAVA实现SOCKET多客户端通信的案例 2019-04-21
java 使用或覆盖了已过时的api_JAVA使用或覆盖了已过时的 API 2019-04-21
java 图片旋转保存_Java 对图片90度旋转 2019-04-21
用java实现文学研究助手_数据结构文学研究助手 C语言代码实现(带源码+解析)... 2019-04-21
java gc的几种方式_GC 的三种基本实现方式 2019-04-21
wget linux java 32_通过wget在Linux上下载Java JDK会显示在许可证页面上 2019-04-21
babylonjs 设置面板位置_babylonjs 空间坐标转为屏幕坐标 2019-04-21
oracle里面如何查询sqlid,CSS_oracle中如何查看sql, --查询表状态:  select uo.O - phpStudy... 2019-04-21
oracle 查询中用case,oracle case when 在查询时候的用法。 2019-04-21
oracle正在运行的程序包,ORACLE PL/SQL编程详解之程序包的创建与应用 2019-04-21
php局部页面滚动,在访问另一页面后保留浏览器滚动位置 - php 2019-04-21
jmeter运行linux命令行,Jmeter在linux上运行(命令行运行Jmeter) 2019-04-21
linux服务器怎么添加站点,如何增加站点或虚拟主机及文件说明 2019-04-21
linux系统输入指令,Linux系统基础 - 基本操作命令 2019-04-21