spark
发布日期:2021-08-22 21:41:58 浏览次数:0 分类:技术文章

hot3.png

  spark

 

一、什么是spark,诞生于伯克利大学贝尔实验室

 

spark的四种运行模式

  • Local多用于测试
  • Standalone
  • Mesos(apache旗下的资源调度框架)
  • YARN最具有前景

 

RDD是基础     :Resilient Distributed Dataset          弹性分布式数据集  

  • 数据的一种瞬时状态

RDD的五大特性

  • A list of partitions  : 可以把数据放在多个分片上进行计算
  • A function for computing each split  : 每个split进行计算
  • A list of dependencies on other RDDs  :一般来说一个rdd要依赖于其他rdd
  • Optionally, a Partitionerfor key-value RDDs  : 
  • Optionally, a list of preferred locations to compute each split on  : 数据本地化计算,优先计算本地数据

 

算子:

transformations

  •  var data: RDD[String] = sc.textFile("G:\\user.txt")
  •     sc.parallelize(1 to 10 )

Action

 

map:对rdd中的元素操作

flatMap:对rdd中的结合中的元素操作

    var data: RDD[Array[Int]] = sc.parallelize(List(Array(1,2,4),Array(456,3,354),Array(322,42)))

    var data2: RDD[Int] = data.flatMap(x => x)

    val data3 = data.flatMap(x => {

      val a:Array[Int] = x

      a.map(_+2)

 

    })

 

    data2.foreach(println(_))

 

mapPartitions:去壳

  data.mapPartitions(part => part.filter(_.length>3)).flatMap(x=> x ).foreach(println(_))

 

第二集完

 

glom : 加壳

  var ss: RDD[Int] = sc.parallelize(1  to 10,3)

   var g: RDD[Array[Int]] =  ss.glom()

 

 

union  合并  同  ++

    var m = sc.parallelize(1 to 10 )

    var n = sc.parallelize(100 to 110 )

    m.union(n).foreach(println(_))

=

  (m ++ n).foreach(println(_))

 

cartesian  笛卡尔积

 var xx: RDD[(Int, Int)] =   m.cartesian(n)

   var b: Long = xx.count()

  m.cartesian(n).map(a =>{a._1 +"  "+a._2}).foreach(println(_))

 

sortByKey  和  sortBy

    m.cartesian(n).sortByKey(true,1).map(a =>{a._1 +"  "+a._2}).foreach(println(_))

    m.cartesian(n).sortBy(_._2,true,1).map(a =>{a._1 +"  "+a._2}).foreach(println(_))

 

filter

    var m = sc.parallelize(1 to 10 )

    m.filter(_>5).foreach(println(_))

 

distinct  去重

    var m: RDD[Int] = sc.parallelize(Array(1,1,2,3,5,66,66,67))

    m.distinct().foreach(println(_))

 

reduce :累加求和, reduceByKey 先分组,再累加求和

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))

 println( m.reduce((x,y) =>x+y))

 

   var n = sc.parallelize( List((1,3),(1,4),(3,2),(4,2),(4,2)))

    n.reduceByKey((x,y) => x+y).foreach(println(_))

结果:

(4,4)

(1,7)

(3,2)

 

subtract  m 集合 - n集合

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,6,7,7))

    var n: RDD[Int] = sc.parallelize(Array(1,2,3,4))

    m.subtract(n).foreach(println(_))

 

sample  :取样,第一个参数是是否放回,第二个参数是取出来的概率

    sc.parallelize(1 to 10).sample(true,0.5,1).foreach(println(_))

takeSample:取样,取指定个数个

  sc.parallelize(1 to 100).takeSample(true,10,1).foreach(println(_))

saveAsTextFile:保存为文件

   var xx: RDD[Int] = sc.parallelize(1 to 100)
    xx.saveAsTextFile("G:/text.txt")

collect:转换为scala集合,action算子

   var xx: RDD[Int] = sc.parallelize(1 to 100)
    var sxx: Array[Int] =xx.collect()

lookup:对所有分区的数据进行扫描,将指定键值的数据拿到

   var n = sc.parallelize( List((1,3),(1,4),(3,2),(4,2),(4,2)))
    n.lookup(1).foreach(println(_))

结果:

3

4

top:取值最大的几个元素

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
    m.top(2).foreach(println(_))

take:从集合前面取几个元素,返回值为scala的集合类型

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
    m.take(2).foreach(println(_))

drop:scala中的方法,不是算子和take对立,丢去

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
    m.take(2).drop(1).foreach(println(_))

 

fold:先默认值自加,在累加

    var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))

    println(m.fold(1)((x,y) => {x+y}))

获取系统时间

 var xx: Long =   System.currentTimeMillis()

 

持久化:Persist  Cache

使用缓存

var m: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
m.cache()
m.persist(StorageLevel.DISK_ONLY_2)
m.unpersist()  //在使用完数据后一定要使用该方法来释放内存

使用缓存级别涉及到的参数:

    private var _useDisk: Boolean,:使用硬盘  
    private var _useMemory: Boolean,:使用内存
    private var _useOffHeap: Boolean,:使用堆以外的
    private var _deserialized: Boolean,:不用序列化
    private var _replication: Int = 1        :副本数

  val NONE = new StorageLevel(false, false, false, false):完全不用缓存
  val DISK_ONLY = new StorageLevel(true, false, false, false):使用磁盘做缓存
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2):使用磁盘有两个副本
  val MEMORY_ONLY = new StorageLevel(false, true, false, true):使用内存,如果内存不够就放一部分,使用序列化
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2):同上,但是有两个副本
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false):使用内存缓存数据,使用序列化
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2):同上,有两个副本
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true):内存不够往磁盘放
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2):同上,有两个副本
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false):内存不过往磁盘放,序列化
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2):同上,有两个副本
  val OFF_HEAP = new StorageLevel(false, false, true, false):往tachyon(分布式内存文件系统)里放数据

 

java代码写spark程序  略

 

 

转载于:https://my.oschina.net/captainliu/blog/715877

上一篇:网上的一篇spring security详解教程,觉得不错,转过来了
下一篇:oracle pctfree和pctused详解