Skip to content

Latest commit

 

History

History
71 lines (45 loc) · 1.67 KB

13、Spark-RDD分区器.md

File metadata and controls

71 lines (45 loc) · 1.67 KB

RDD目前有HashRange分区,同时Spark也运行用户自定义分区,自定义分区器的步骤很简单:

  • 继承Partitioner抽象类
  • 实现numPartitionsgetPartition方法

如果要使用自定义的分区器,就需要rdd.partitionBy把自定义的Partitioner作为参数传递进去就好了

ok,比较简单,直接上案例,把RDD中的nba分到0号分区,wnba分到1号分区,其他的分到2号分区:

package com.wzq.bigdata.spark.core.rdd.part

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Spark01_RDD_part {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("partitioner")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(List(
      ("nba", "xxxxxxx"),
      ("cba", "xxxxxxx"),
      ("wnba", "xxxxxxx"),
      ("nba", "xxxxxxx"),
    ))

    val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)

    partRDD.saveAsTextFile("output")

    sc.stop()
  }

  /**
   * 自定义分区器
   */
  class MyPartitioner extends Partitioner {

    // 分区数量
    override def numPartitions: Int = 3

    // 返回数据的key值返回数据所在的分区索引(从0开始)
    override def getPartition(key: Any): Int = {
      key match {
        case "nba" => 0
        case "wnba" => 1
        case _ => 2
      }
    }
  }

}

结果如下:

参考资料