Spark - Spark Streaming 阶段性总结(2017-05-11)
参考:
发布日期:2021-06-30 19:50:41
浏览次数:3
分类:技术文章
本文共 2375 字,大约阅读时间需要 7 分钟。
1. spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable
出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。
解决这个问题最常用的方法有:
- 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
- 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
- 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。将引用的类做成可序列化的。
2. SparkStreaming的Broadcast
object IPRuels { // 将ip地址转换为整数 def ip2num(ip : String) : Long = { val fragments = ip.split("\\.") var ipNum = 0L for (i <- 0 until fragments.length) { // 与运算 ipNum = fragments(i).toLong | ipNum << 8L } ipNum } @volatile private var instance : Broadcast[Array[(Long, Long, String)]] = null def getInstance(sc: SparkContext): Broadcast[Array[(Long, Long, String)]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line => // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd { val fields = line.trim().split("\t") val start_num = ip2num(fields(0).trim()) val end_num = ip2num(fields(1).trim()) val province = fields(2).trim() (start_num, end_num, province) }).collect() instance = sc.broadcast(wordBlacklist) } } } instance }}
def IPLocation(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL): Unit = { val ipRulesBroadcast = IPRuels.getInstance(sc) val result = dStream.map(line => { ipPat.findFirstIn(line.toString()).mkString("")}) .map(ip => { var info : Any = None if(!ip.isEmpty) { val ipNum = ip2num(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) info = ipRulesBroadcast.value(index) } (info, 1L)}) .reduceByKey(_+_) result.foreachRDD((rdd : RDD[(Any, Long)]) => { // 将total 写在map,for里 var total = 1L try { total = rdd.reduce((x, y) => ("t", x._2 + y._2))._2 } catch { case ex : UnsupportedOperationException => { total = 1 } } val rowRdd = rdd.map(x => { val r = x._2.toFloat/total (x._1, r) }).map(line => Row(line._1.toString, line._2.toString)) val schemaStr = "loc,rate" mongoSQL.put(schemaStr, rowRdd, "IP_rate_2") }) }
3. 保存3个好博客
转载地址:https://lipenglin.blog.csdn.net/article/details/71617046 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月07日 15时24分37秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Linux png转jpg (convert命令)
2019-04-30
NAS (Network Attached Storage 网络附属存储)
2019-04-30
Ubuntu更新后终端中字体的颜色全是白色
2019-04-30
vscode git
2019-04-30
基于MATLAB的二进制数字调制与解调信号的仿真——2FSK
2019-04-30
基于MATLAB的二进制数字调制与解调信号的仿真——2PSK
2019-04-30
基于MATLAB的模拟调制信号与解调的仿真——AM
2019-04-30
基于MATLAB的模拟调制信号与解调的仿真——DSB
2019-04-30
基于MATLAB的模拟调制信号与解调的仿真——SSB
2019-04-30
操作系统实验之生产者和消费者程序
2019-04-30
操作系统实验之猴子过桥问题的模拟程序
2019-04-30
POJ - 3067 Japan (树状数组 思维)
2019-04-30
POJ - 2352 Stars (树状数组 入门题)
2019-04-30
HDU - 1166 敌兵布阵 (树状数组模板题/线段树模板题)
2019-04-30
CodeForces - 761C Dasha and Password (思维 暴力)
2019-04-30
POJ - 2481 Cows (树状数组 入门题)
2019-04-30
ACM-ICPC 2018 焦作赛区网络预赛 I. Save the Room
2019-04-30
CodeForces - 987C Three displays (暴力/dp)
2019-04-30