Spark中的数据读取保存和累加器实例详解

2023-12-05 0 400
目录
  • 数据读取与保存
    • Text文件
    • Sequence文件
    • Object对象文件
  • 累加器
    • 累加器概念
    • 系统累加器

数据读取与保存

Text文件

对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆。

1)基本语法

(1)数据读取:textFile(String)

(2)数据保存:saveAsTextFile(String)

2)实现代码demo如下:

object Operate_Text {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName(\”SparkCoreTest\”).setMaster(\”local[1]\”)
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 读取输入文件
val inputRDD: RDD[String] = sc.textFile(\”input/demo.txt\”)
//3.2 保存数据
inputRDD.saveAsTextFile(\”textFile\”)
//4.关闭连接
sc.stop()
}
}

Sequence文件

SequenceFile文件 是Hadoop中用来存储二进制形式的 key-value对 的一种平面文件(Flat File)。在SparkContext中,可以通过调用 sequenceFile[ keyClass,valueClass ] (path) 来调用。

1)基本语法

  • (1)数据读取:sequenceFile[ keyClass, valueClass ] (path)
  • (2)数据保存:saveAsSequenceFile(String)

2)实现代码demo如下:

object Operate_Sequence {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName(\”SparkCoreTest\”).setMaster(\”local[1]\”)
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建rdd
val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
//3.2 保存数据为SequenceFile
dataRDD.saveAsSequenceFile(\”seqFile\”)
//3.3 读取SequenceFile文件
sc.sequenceFile[Int,Int](\”seqFile\”).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}

Object对象文件

对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过 objectFile[ k , v ] (path) 函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为要序列化所以要指定类型。

1)基本语法

  • (1)数据读取:objectFile[ k , v ] (path)
  • (2)数据保存:saveAsObjectFile(String)

2)实现代码demo如下:

object Operate_Object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName(\”SparkCoreTest\”).setMaster(\”local[1]\”)
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
//3.2 保存数据
dataRDD.saveAsObjectFile(\”objFile\”)
//3.3 读取数据
sc.objectFile[Int](\”objFile\”).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}

累加器

累加器概念

累加器,是一种变量—分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。

累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

系统累加器

1)累加器定义(SparkContext.accumulator(initialValue)方法)

val sum: LongAccumulator = sc.longAccumulator("sum")

2)累加器添加数据(累加器.add方法)

sum.add(count)

3)累加器获取数据(累加器.value)

sum.value

注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

4)累加器要放在行动算子中

因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。

5) 代码实现:

object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(\”SparkCoreTest\”).setMaster(\”local[*]\”)
val sc = new SparkContext(conf)
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List((\”a\”, 1), (\”a\”, 2), (\”a\”, 3), (\”a\”, 4)))
//需求:统计a出现的所有次数 (\”a\”,10)
//普通算子实现 reduceByKey 代码会走shuffle 效率低
val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
//累加器实现
//1 声明累加器
val accSum: LongAccumulator = sc.longAccumulator(\”sum\”)
dataRDD.foreach{
case (a,count) => {
//2 使用累加器累加 累加器.add()
accSum.add(count)
// 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量
//println(\”sum = \” + accSum.value)
}
}
//3 获取累加器的值 累加器.value
println((\”a\”,accSum.value))
sc.stop()
}
}

以上就是Spark中的数据读取保存和累加器实例详解的详细内容,更多关于Spark数据读取保存累加器的资料请关注悠久资源网其它相关文章!

您可能感兴趣的文章:

  • 详解Spark Sql在UDF中如何引用外部数据
  • Spark处理trick总结分析
  • Spark GraphX 分布式图处理框架图算法详解
  • pyspark自定义UDAF函数调用报错问题解决
  • Apache Hudi集成Spark SQL操作hide表
  • Java开发Spark应用程序自定义PipeLineStage详解

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

悠久资源 编程技巧 Spark中的数据读取保存和累加器实例详解 https://www.u-9.cn/biancheng/jiqiao/98262.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务