Spark删除redis千万级别set集合数据实现分析

2023-12-07 0 812
目录
  • 1.使用pipline的原因
  • 2.方法
    • 2.1写入redis的方法
      • 2.1.1参数说明
    • 2.2读取本地待删除数据的方法
      • 2.2.1参数说明
    • 2.3调用pipline删除的方法
      • 2.3.1参数说明
  • 3.完整代码
    • 4.总结

      1.使用pipline的原因

      Redis 使用的是客户端-服务器(CS)模型和请求/响应协议的 TCP 服务器。

      这意味着通常情况下一个请求会遵循以下步骤:

      • 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等待服务端响应。
      • 服务端处理命令,并将结果返回给客户端。
      • 管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。

      通俗点:

      • pipeline就是把一组命令进行打包,然后一次性通过网络发送到Redis。同时将执行的结果批量的返回回来
      • pipelined.sync()表示我一次性的异步发送到redis,不关注执行结果。
      • pipeline.syncAndReturnAll ();将返回执行过的命令结果返回到List列表中

      2.方法

      2.1写入redis的方法

      2.1.1参数说明

      sc:SparkContext Spark上下文spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

      def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
      // spark读取数据集
      val df: DataFrame = spark.read.parquet(\”file:///F://delRedisData//1//delData.snappy.parquet\”)
      df.show(1,false)
      val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String](\”r\”))
      // 这个集合写的是2000多万的数据
      sc.toRedisSET(rdd,\”test:task:deplicate\”)
      }

      2.2读取本地待删除数据的方法

      2.2.1参数说明

      sc:SparkContext Spark上下文

      spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

      def readParquet(spark: SparkSession,path:String): RDD[String] ={
      val df: DataFrame = spark.read.parquet(path)
      val strRDD: RDD[String] = df.rdd.map(_.getAs[String](\”r\”))
      // 返回String类型的RDD
      strRDD
      }

      2.3调用pipline删除的方法

      2.3.1参数说明

      collectionName 其中redis set集合的名称

      num是要删除的数据量是多少

      arr是要删除的数据存放的是set集合的key

      jedis是redis的客户端

      def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
      try{
      val pipeline: Pipeline = jedis.pipelined()
      // 选择数据库 默认为 0
      pipeline.select(1)
      for(i <- 0 to (num – 1) ){
      pipeline.srem(collectionName,arr(i))
      }
      //表示我一次性的异步发送到redis,不关注执行结果
      pipeline.sync()
      }catch {
      case e : JedisException => e.printStackTrace()
      }finally if(jedis !=null) jedis.close()
      }

      3.完整代码

      import com.redislabs.provider.redis._
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.{DataFrame, SparkSession}
      import org.apache.spark.{SparkConf, SparkContext}
      import redis.clients.jedis.exceptions.JedisException
      import redis.clients.jedis.{Jedis, Pipeline}
      /**
      * Date 2022/5/25 17:57
      */
      object DelRedis {
      def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
      // 驱动进程使用的内核数,仅在集群模式下使用。
      .set(\”spark.driver.cores\”,\”5\”)
      /**
      * 驱动进程使用的内存数量,也就是SparkContext初始化的地方,
      * 其格式与JVM内存字符串具有大小单位后缀(“k”,“m”,“g”或“t”)(例如512m, 2g)相同。
      * 注意:在客户端模式下,不能直接在应用程序中通过SparkConf设置此配置,因为此时驱动程
      * 序JVM已经启动。相反,请通过——driver-memory命令行选项或在默认属性文件中设置。
      */
      .set(\”spark.driver.memory\”,\”5g\”)
      /**
      * 限制每个Spark操作(例如collect)的所有分区的序列化结果的总大小(以字节为单位)。
      * 应该至少是1M,或者0表示无限制。如果总大小超过此限制,则作业将被终止。
      * 过高的限制可能会导致驱动程序内存不足错误(取决于spark.driver.memory和JVM中对象的内存开销)。
      * 设置适当的限制可以防止驱动程序出现内存不足的错误。
      */
      .set(\”spark.driver.maxResultSize\”,\”10g\”)
      /**
      * 每个执行程序进程使用的内存数量,
      * 格式与带有大小单位后缀(“k”,“m”,“g”或“t”)的JVM内存字符串相同(例如512m, 2g)。
      *
      */
      .set(\”spark.executor.memory\”,\”5g\”)
      /**
      * 默认 1在YARN模式下,worker上所有可用的内核在standalone和Mesos粗粒度模式下。
      */
      .set(\”spark.executor.cores\”,\”5\”)
      val spark: SparkSession = SparkSession.builder().appName(\”DelRedis\”).master(\”local[*]\”)
      .config(\”spark.redis.host\”,\”192.168.100.201\”)
      .config(\”spark.redis.port\”,\”6379\”)
      .config(\”spark.redis.db\”,\”1\”) // 可选的数据库编号。避免使用它,尤其是在集群模式下,redisRedis默认支持16个数据库,默认是选择数据库0,这里设置为1。
      .config(\”spark.redis.timeout\”,\”2000000\”) // 连接超时,以毫秒为单位,默认为 2000 毫秒
      .config(conf)
      .getOrCreate()
      val sc: SparkContext = spark.sparkContext
      //1.写入数据集
      writeRedis(sc,spark)
      // 2.读取待删除的数据key
      val path = \”file:///F://delRedisData//test.parquet\”
      val rdd: RDD[String] = readParquet(spark,path)
      //3.使用redis 中的 pipeline 方法 进行删除操作
      rdd.foreachPartition(iter=>{
      // 连接redis客户端
      val jedis = new Jedis(\”192.168.100.201\”,6379)
      val array: Array[String] = iter.toArray
      val length: Int = array.length
      val beginTime: Long = System.currentTimeMillis()
      delPipleine(collectionName,length,array,jedis)
      val endTime: Long = System.currentTimeMillis()
      println(\”删除:\”+length+\”条数据,耗时:\”+(endTime-beginTime)/1000+\”秒\”)
      })
      sc.stop()
      spark.stop()
      }
      def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
      try{
      val pipeline: Pipeline = jedis.pipelined()
      // 选择数据库 默认为 0
      pipeline.select(1)
      for(i <- 0 to (num – 1) ){
      pipeline.srem(collectionName,arr(i))
      }
      //表示我一次性的异步发送到redis,不关注执行结果
      pipeline.sync()
      }catch {
      case e : JedisException => e.printStackTrace()
      }finally if(jedis !=null) jedis.close()
      }
      def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
      // spark读取数据集
      val df: DataFrame = spark.read.parquet(\”file:///F://delRedisData//1//delData.snappy.parquet\”)
      df.show(1,false)
      val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String](\”r\”))
      // 这个集合写的是2000多万的数据
      sc.toRedisSET(rdd,\”test:task:deplicate\”)
      }
      def readParquet(spark: SparkSession,path:String): RDD[String] ={
      val df: DataFrame = spark.read.parquet(path)
      val strRDD: RDD[String] = df.rdd.map(_.getAs[String](\”r\”))
      // 返回String类型的RDD
      strRDD
      }
      }

      4.总结

      经检测:redis 的 pipeline(管道)方法 ,经单机版的redis测试 ,百万级别数据删除仅需要1分钟左右与硬件有关,还包括读取数据的时长等方面原因

      以上就是Spark删除redis千万级别set集合数据实现分析的详细内容,更多关于Spark删除redis set集合的资料请关注悠久资源其它相关文章!

      收藏 (0) 打赏

      感谢您的支持,我会继续努力的!

      打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
      点赞 (0)

      悠久资源 Redis Spark删除redis千万级别set集合数据实现分析 https://www.u-9.cn/database/redis/122706.html

      常见问题

      相关文章

      发表评论
      暂无评论
      官方客服团队

      为您解决烦忧 - 24小时在线 专业服务