spark
一、什么是spark,诞生于伯克利大学贝尔实验室
spark的四种运行模式
- Local多用于测试
- Standalone
- Mesos(apache旗下的资源调度框架)
- YARN最具有前景
RDD是基础 :Resilient Distributed Dataset 弹性分布式数据集
- 数据的一种瞬时状态
RDD的五大特性
- A list of partitions : 可以把数据放在多个分片上进行计算
- A function for computing each split : 每个split进行计算
- A list of dependencies on other RDDs :一般来说一个rdd要依赖于其他rdd
- Optionally, a Partitionerfor key-value RDDs :
- Optionally, a list of preferred locations to compute each split on : 数据本地化计算,优先计算本地数据
算子:
transformations
- var data: RDD[String] = sc.textFile("G:\\user.txt")
- sc.parallelize(1 to 10 )
Action
map:对rdd中的元素操作
flatMap:对rdd中的结合中的元素操作
var data: RDD[Array[Int]] = sc.parallelize(List(Array(1,2,4),Array(456,3,354),Array(322,42)))
var data2: RDD[Int] = data.flatMap(x => x)
val data3 = data.flatMap(x => {
val a:Array[Int] = x
a.map(_+2)
})
data2.foreach(println(_))
mapPartitions:去壳
data.mapPartitions(part => part.filter(_.length>3)).flatMap(x=> x ).foreach(println(_))
第二集完
glom : 加壳
var ss: RDD[Int] = sc.parallelize(1 to 10,3)
var g: RDD[Array[Int]] = ss.glom()
union 合并 同 ++
var m = sc.parallelize(1 to 10 )
var n = sc.parallelize(100 to 110 )
m.union(n).foreach(println(_))
=
(m ++ n).foreach(println(_))
cartesian 笛卡尔积
var xx: RDD[(Int, Int)] = m.cartesian(n)
var b: Long = xx.count()
m.cartesian(n).map(a =>{a._1 +" "+a._2}).foreach(println(_))
sortByKey 和 sortBy
m.cartesian(n).sortByKey(true,1).map(a =>{a._1 +" "+a._2}).foreach(println(_))
m.cartesian(n).sortBy(_._2,true,1).map(a =>{a._1 +" "+a._2}).foreach(println(_))
filter
var m = sc.parallelize(1 to 10 )
m.filter(_>5).foreach(println(_))
distinct 去重
var m: RDD[Int] = sc.parallelize(Array(1,1,2,3,5,66,66,67))
m.distinct().foreach(println(_))
reduce :累加求和, reduceByKey 先分组,再累加求和
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
println( m.reduce((x,y) =>x+y))
var n = sc.parallelize( List((1,3),(1,4),(3,2),(4,2),(4,2)))
n.reduceByKey((x,y) => x+y).foreach(println(_))
结果:
(4,4)
(1,7)
(3,2)
subtract m 集合 - n集合
var m: RDD[Int] = sc.parallelize(Array(1,2,3,6,7,7))
var n: RDD[Int] = sc.parallelize(Array(1,2,3,4))
m.subtract(n).foreach(println(_))
sample :取样,第一个参数是是否放回,第二个参数是取出来的概率
sc.parallelize(1 to 10).sample(true,0.5,1).foreach(println(_))
takeSample:取样,取指定个数个
sc.parallelize(1 to 100).takeSample(true,10,1).foreach(println(_))
saveAsTextFile:保存为文件
var xx: RDD[Int] = sc.parallelize(1 to 100)
xx.saveAsTextFile("G:/text.txt")collect:转换为scala集合,action算子
var xx: RDD[Int] = sc.parallelize(1 to 100)
var sxx: Array[Int] =xx.collect()lookup:对所有分区的数据进行扫描,将指定键值的数据拿到
var n = sc.parallelize( List((1,3),(1,4),(3,2),(4,2),(4,2)))
n.lookup(1).foreach(println(_))结果:
3
4
top:取值最大的几个元素
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
m.top(2).foreach(println(_))take:从集合前面取几个元素,返回值为scala的集合类型
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
m.take(2).foreach(println(_))drop:scala中的方法,不是算子和take对立,丢去
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
m.take(2).drop(1).foreach(println(_))
fold:先默认值自加,在累加
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
println(m.fold(1)((x,y) => {x+y}))
获取系统时间
var xx: Long = System.currentTimeMillis()
持久化:Persist Cache
使用缓存
var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
m.cache() m.persist(StorageLevel.DISK_ONLY_2) m.unpersist() //在使用完数据后一定要使用该方法来释放内存使用缓存级别涉及到的参数:
private var _useDisk: Boolean,:使用硬盘
private var _useMemory: Boolean,:使用内存 private var _useOffHeap: Boolean,:使用堆以外的 private var _deserialized: Boolean,:不用序列化 private var _replication: Int = 1 :副本数val NONE = new StorageLevel(false, false, false, false):完全不用缓存
val DISK_ONLY = new StorageLevel(true, false, false, false):使用磁盘做缓存 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2):使用磁盘有两个副本 val MEMORY_ONLY = new StorageLevel(false, true, false, true):使用内存,如果内存不够就放一部分,使用序列化 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2):同上,但是有两个副本 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false):使用内存缓存数据,使用序列化 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2):同上,有两个副本 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true):内存不够往磁盘放 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2):同上,有两个副本 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false):内存不过往磁盘放,序列化 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2):同上,有两个副本 val OFF_HEAP = new StorageLevel(false, false, true, false):往tachyon(分布式内存文件系统)里放数据
java代码写spark程序 略