数据挖掘工具---spark使用方法(一)
发布日期:2021-07-24 12:00:35 浏览次数:1 分类:技术文章

本文共 6126 字,大约阅读时间需要 20 分钟。

问题的起源

之前的集群计算系统都是基于非循环的数据流模型,即从稳定的物理存储系统加载记录,传给一组确定性操作构成的DAG,然后在将得到的结果写回存储系统。这种方式如果用在迭代计算中,或者是交互式查询中(即不断的在数据子集中筛选数据),此时会存在大量的读磁盘和写磁盘及网络传输。通信开销大,整个计算效率会很低。

RDD的提出

RDD(Resilient Distrubuted Dataset),即弹性分布式数据集。它主要通过两种手段克服上面提到的问题:一是允许将数据缓存到内存中,这样如果重用到此数据直接从内存读取即可;二是很多的RDD操作并不会马上运行出结果,而是将一系列转化记录下来即lineage。只有当遇到动作操作时才会出发计算。当然,lineage也能用于数据恢复。

我们可以从RDD的内部接口来窥探它的实现方式。

函数 功能
partitions() 返回一组partion对象(一组RDD分区),也就是数据集的原子组成部分,分区是以java对象的方式缓存的内存中的,当然只有在RDD动作操作的时候才会真正缓存和计算。一个数据集很大,会分成很多个数据块,一个数据块会放在几个节点上,数据块的大小,存放位置,其他信息由集群的资源管理器管控和调配。而分区是在数据块基础由RDD产生的,这个RDD可以了解,掌控和操作。
preferredLocations(p) 返回数据分区的存放位置,也就是在哪个节点访问数据最快 dependences() 返回对父RDD的一组依赖,这组依赖描述了RDD的lineage,依赖又分为窄依赖和宽依赖
dependences() 返回对父RDD的一组依赖,这组依赖描述了RDD的lineage,依赖又分为窄依赖和宽依赖
Iterator(p,parentIters) 按照父分区的迭代方式逐个计算分区p的元素
partitioner() 返回的是数据块的分区模式和数据块的位置,知道了分区模式和数据快位置就可以自己算出各分区的元素和位置
RDD只有转换操作和动作操作两种操作,转化操作只记录RDD之间的血统关系并不产生真正的计算;当动作操作触发真正的计算时,它是先由血统关系逐级往回,找到源头的RDD,这种源头可以是定义从节点磁盘上获取数据的RDD,也可以是中间已缓存到内存中的RDD,再由源头RDD逐级计算得到最终结果,而不需每一步都去计算和缓存。 转化操作或动作操作能够执行的基础是记录了RDD之间的依赖关系,依赖分为窄依赖和宽依赖,下图揭示了这两种依赖的景象。

区分这两种依赖:首先,窄依赖能够并行的在所有节点中进行,对于单个节点来讲,一系列的操作可以在节点内部以流水线的方式完成,如逐个元素进行map和filter,而宽依赖则需要计算好所有父分区数据,然后在节点之间shuffle,类似于reduce;其次,窄依赖更容易实现对故障节点的恢复,只需要计算丢失RDD分区的父分区,而且各节点可以并行进行,而宽依赖,单个节点的失效需要整体的重新计算。

熟悉各种RDD是窄依赖还是宽依赖非常重要,恰当选择合适的RDD组合来实现相同功能,会使应用的效率大大提升。

关于spark

spark是实现RDD这种思想的系统,并且可以用来开发多种并行应用。

要使用spark,必须编写驱动程序。一个驱动程序(也可以把它当成应用程序),包含一个到集群的连接,在python语言实现的驱动程序中用sparkcontext(),这个接口作用就跟连接数据库的jdbc一样或者文件读取的open()一样,我们可以对这个连接进行配置,比如连接到哪个集群;包含一系列的RDD,这些RDD是我们实现各种功能的基础;包含RDD上的动作,真正触发计算,缓存等操作,从而实现功能。

驱动程序的功能由集群中的worker来执行,在连接集群的时候会启动和运行这些worker,下图是一个简单示意图。

worker是长时运行的节点上的进程,worker从分布式文件系统读取数据块,并将计算后的RDD分区以java对象的形式缓存在内存中。worker之所以可以完成这些功能原因是spark系统把驱动程序的字节码分发到相应的节点了。

spark内部由调度器来为动作确定有效的执行计划,计划是根据RDD的结构信息来安排。调度器的接口是rubjob函数,参数是RDD及其分区,和分区上的函数。

调度器根据RDD的lineage图来创建一个由stage构成的有向无环图(DAG),如下图所示

上图中虚线表示stage,实线表示RDD,实心矩形表示分区(黑色表示该分区已被缓存),本例不再执行stage1,因为B已经存在于缓存中,只需执行stage2,stage3

在每一个stage内部尽可能的包含一组具有窄依赖关系的转化,并将它们流水线并行化。stage的边界有两种情况,一是宽依赖上的shuffle操作,这个很容易理解,因为宽依赖不可能实现流水线;二是已经缓存分区,已缓存分区直接取数就可以。

调度器根据数据存放的位置来分配任务,以最小化通信开销。如果某个任务需要处理已缓存的分区,则将该任务直接分配给该分区的节点;如果需要处理的分区位于多个可能的位置,则将该任务分配给一组节点。
对于宽依赖,目前的实现方式是在父分区的节点上将中间结果物化,简化容错处理。
如果某个任务失败,只要stage中父RDD分区可用,只需要在另一个节点上重新运行这个人员即可。如果某些stage不可用,则需要重新提交这个stage中所有的任务来恢复丢失的分区。

lookup动作允许从一个哈希分区或范围分区上的RDD根据关键字来读取一个数据元素。从这句话可以看到lookup是有使用单位的。只适用哈希或范围分区,行为有映射关系,就可以根据这种映射关系去寻找单个元素。

spark获取数据的方法

一是在驱动程序中对一个集合进行并行化,利用SparkContext()的parallelize(),如

sc.parallelize(["gdf","kgdr","vVy"])

这种方法需要将整个数据集先放在一台机器的内存中。

二是从外部读取,如

sc.textfile("文件路径")

如果sc的配置是连接集群,则从集群读取,如果没有配置,则从本地读取。

在 Spark 中使用 HDFS 只需要将输入输出路径指定为: hdfs://master:port/path 就够了。
如果不是从hdfs的系统文件夹中读取数据,而是从本地读取数据,需要同一个文件存在所有的worker node上面,在读取的时候每个worker node的task会去读取本文件的一部分,不然会出错;spark app所运行的节点也需要有这个file,因为需要用到file进行Partition划分。每个worker node的executor执行的task要读取的Split的Location信息是localhost,他不会到master上面读,只会在运行这个task的worker node本地读。
这种使用textFile方法读取本地文件系统的文件的方法,只能用于debug,不用于其他任何用途,因为他会导致file的replication数与node的个数同步增长;上述描述中的分成2份是默认值,你可以自己设置partition个数。具体可参考。

spark传递函数的两种方法

一是用lambda,比如

lambda line:"python" in line

二是自定义一个函数,比如,

define haspython(line):	return "python" in line

Spark中的函数

针对一个RDD的转化操作

函数名 功能 示例 操作类型 依赖类型
map() 传入一个函数作为参数,将函数应用RDD的每一个元素,一个输入元素对应一个输出元素,具有映射的作用,返回值构成一个新的RDD 转换操作 窄依赖
flatmap() 将函数应用于每一个元素,一个输入元素对应多个输出元素。返回值构成新的RDD 转换操作 窄依赖
filter() 利用传入的函数,达到筛选过滤的作用 转换操作 窄依赖
distinct() 去重 转换操作 窄依赖
sample(withreplacement,fraction,[seed]) 对RDD进行有放回或无放回的采样 转换操作 窄依赖
sort() 转换操作

针对两个RDD的转化操作

函数名 功能 示例 操作类型 依赖类型
union() 生成一个包含两个RDD中所有元素的RDD 转换操作 宽依赖
intersection() 求两个RDD共同元素的RDD 转换操作 宽依赖
subtract() 移除RDD中的一部分元素,被移除元素有另一个RDD给出 转换操作 宽依赖
Cartesian() 与另一个RDD的笛卡尔积 转换操作 宽依赖
join() 转换操作

动作操作

函数名 功能 示例 操作类型 依赖类型
collect() 返回RDD中的所有元素 动作操作
count() RDD中所有元素的个数 动作操作
countByValue() 各元素出现的个数 动作操作
take(num) 从RDD中返回num个元素 动作操作
top(num) 从RDD中返回最前面的RDD个元素 动作操作
takeOrderd(num) (ordering) 按照给定的顺序从RDD取num个元素,ordering表示顺序,myOrdering表示逆序 动作操作
takeSample(withReplacement,num,[seed]) 有放回或无放回的从RDD中返回任意一些元素,与sample()不同,sample()未触发计算,而takeSample()触发计算。 动作操作
reduce() 并行整合RDD中的所有元素 动作操作

pair RDD 的转化操作

函数名 功能 示例 操作类型 依赖类型
reduceByKey(func) 按照函数合并具有相同键的值 转换操作 窄依赖
groupByKey() 按键值分组 转换操作 窄依赖或宽依赖
combineByKey(createCombiner,
mergeValue,mergeCombiners,partitioner)
使用不同的返回类型,合并具有相同键的值 转换操作
mapValue(func) 对pair RDD中的每个值应用函数但不改变键 转换操作 窄依赖
flatMapValue(func) 对pair RDD中的每个值应用函数,一个值对应多个输出,不改变键 转换操作 窄依赖
keys() 返回仅包含键的RDD 转换操作 窄依赖
values() 返回键值对的值,构成新的RDD 转换操作 窄依赖
sortByKey() 按照键对键值对进行排序 转换操作

针对两个pair RDD的转化操作

函数名 功能 示例 操作类型 依赖类型
subtractByKey(other) 去掉原RDD中与other中相同键的键值对,得到新的键值对 转换操作
cogroup() 转换操作
join(other) 将两个键值对进行内连接,与mysql不同,这里去掉了空值 转换操作
rightOuterJoin() 将两个键值对进行连接,确保第二个RDD的键都存在 转换操作
leftOuterJoin() 将两个键值对进行连接,确保第一个RDD的键都存在 转换操作
cogroup() 将两个键值对中具有相同键的数据分组到一起,空值保留 转换操作

Pair RDD的行动操作

函数名 功能 示例 操作类型 依赖类型
countByKey() 对每个键对应的元素进行计数 动作操作
collectAsMap() 将结果以映射表的形式返回,以便查询 动作操作
lookup() 返回给定键对应的所有值 动作操作

待整理

crossProduct() 转换操作
partionBy() 转换操作
save() 动作操作

Spark中的传递函数

spark中的函数是对rdd中的每个元素执行相同的操作,这个是基本。然后第个节点都会处理一批数据。

利用分布式的特点的另一种变通的方式将参数rdd化,然后数据当成一个常数,如下所示

def h_w(rdd_in):    Y = rdd_in[3]    a = []    b = []    s = []    y = []    for i in range(rdd_in[4]):          a.append(sum(Y[0:rdd_in[4]]) / float(rdd_in[4]))        b.append((sum(Y[rdd_in[4]:2 * rdd_in[4]]) - sum(Y[0:rdd_in[4]])) / rdd_in[4] ** 2)        s.append(Y[i] / a[i])        y.append(Y[i])    # y.append(Y[i+1])    for i in range(rdd_in[4], len(Y)):        a.append(rdd_in[0] * (Y[i] / s[i - rdd_in[4]]) + (1 - rdd_in[0]) * (a[i - 1] + b[i - 1]))        b.append(rdd_in[1] * (a[i] - a[i - 1]) + (1 - rdd_in[1]) * b[i - 1])        s.append(rdd_in[2] * Y[i] / a[i] + (1 - rdd_in[2]) * s[i - rdd_in[4]])        y.append((a[i] + 1 * b[i]) * s[i - rdd_in[4] + 1])      rmse = sum((y - Y) ** 2)    # for j in range(1, h + 1):     #     y.append((a[i] + j * b[i]) * s[i - rdd_in[5] + j])    weight_rmse=[rdd_in[0], rdd_in[1], rdd_in[2], rmse, y]    return weight_rmseweights=[]m=5h=2for i in np.linspace(0,1,4):    for j in np.linspace(0,1,4):        for k in np.linspace(0,1,4):            weights.append((i,j,k,data,m,h))weights_rdd= sc.parallelize(weights)rmse_weight_rdd=weights_rdd.map(lambda x:h_w(x))

Spark中广播变量和累加器

转载地址:https://blog.csdn.net/qingqing7/article/details/78525593 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:实用工具---电脑系统---电脑系统安装(linux、windows)
下一篇:项目实例---随机森林在Kaggle实例:Titanic中的应用(二)

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月02日 17时54分19秒