기술나눔

빅 데이터 학습을 위한 Spark 기본 사항

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

스파크 기본 사항

간단한 설명

1. 스파크 작업 실행의 특징:

(1) 액션 연산자를 만난 경우에만 스파크 작업 전체가 실행을 위해 트리거됩니다.

(2) 여러 번 만나고, 여러 번 실행됨

2. RDD: 탄력적인 분산 데이터 세트

유연성: 데이터 양이 많거나 적을 수 있음

RDD는 컨테이너와 유사하지만 데이터를 저장하지 않고 계산 논리를 저장합니다.

작업 연산자가 발생하면 전체 스파크 작업이 실행되어 첫 번째 RDD부터 데이터 흐름이 시작됩니다.

데이터는 RDD 간의 흐름 관계일 뿐이며 저장되지 않습니다.

흐르는 데이터의 양은 클 수도 있고 작을 수도 있으므로 이를 탄력성이라고 합니다.

배포:

Spark는 기본적으로 HDFS에서 데이터를 읽어야 합니다. HDFS는 분산되어 있으며 데이터 블록은 향후 다른 데이터 노드에 있을 수 있습니다.

RDD로 흐르는 데이터는 서로 다른 데이터노드의 블록 데이터에서 나올 수 있습니다.

데이터 세트:

계산 흐름 과정에서 RDD는 간단히 컨테이너로 간주될 수 있으며, 컨테이너에는 기본적으로 메모리에 저장되지 않는 데이터가 있습니다.

나중에 RDD의 데이터를 디스크에 저장하는 방법이 있을 것입니다.

RDD의 5가지 주요 특징: (인터뷰에서 꼭 물어보세요!)
1. RDD는 일련의 파티션으로 구성됩니다.

1) 파일을 읽을 때 minPartitions 매개변수는 최소 파티션 수만 결정할 수 있습니다. 파일을 읽은 후 실제 RDD 파티션 수는 데이터 콘텐츠 자체와 클러스터 분포에 따라 결정됩니다.

2) 설정된 minPartitions 개수가 블록 개수보다 적을 경우 실제로는 블록 개수에 따라 파티션 개수가 결정됩니다.

3) 셔플을 생성하는 연산자를 호출할 때 RDD의 파티션 수를 실제로 변경할 수 있는 numPartitions(예: groupby())를 전달할 수 있습니다. 이는 설정된 파티션 수에 따라 최종 RDD의 파티션 수가 결정됩니다. 가지다.

4) 파일은 블록 형태로 HDFS에 저장됩니다. 파일이 기본값인 128M에 도달하지 않으면 블록에도 저장됩니다.

처음에 RDD의 파티션 수는 데이터를 읽는 블록 수에 따라 결정됩니다.

KV 함수를 제외한 후자 RDD의 파티션 데이터는 이전 RDD의 파티션 데이터를 논리적으로 처리한 결과에 해당한다. 기본적으로 후속 파티션이 처리되지 않으면 모든 후속 RDD의 파티션 수는 첫 번째 RDD에 따라 달라집니다.

최종 RDD에는 여러 개의 파티션이 있으며 앞으로는 HDFS에서 여러 결과 파일을 볼 수 있습니다(HDFS -> RDD -> HDFS).

2. 운영자는 각 파티션에 대해 작업합니다. (각 파티션이 처리됩니다.)
3. RDD와 RDD 사이에는 몇 가지 종속성이 있습니다.

1) 이전 RDD의 특정 파티션의 데이터가 다음 RDD의 고유 파티션으로만 이동하는(또는 이전 여러 파티션이 다음 파티션으로 이동할 수 있음) 일대일 관계에만 의존합니다.

2) 이전 RDD에 크게 의존하는 이전 RDD의 특정 파티션의 일대다 관계는 다음 RDD에서 다른 파티션으로 들어가게 되며 셔플이 생성되는지 확인하여 판단할 수도 있습니다.

3) 전체 스파크 작업은 넓은 종속성 수에 따라 여러 단계로 나누어집니다. Num(단계) = Num(넓은 종속성) + 1

4) 셔플을 생성하는 연산자를 만나면 다음이 포함됩니다.이전 RDD의 데이터를 디스크에 쓰고 디스크의 데이터를 다음 RDD로 읽는 현상입니다.

참고: 실행이 처음으로 트리거되면 디스크에 데이터가 없으므로 생성된 첫 번째 RDD부터 실행이 시작됩니다.

동일한 실행이 반복적으로 트리거되면 동일한 DAG 방향성 비순환 그래프에 대해 셔플 후 RDD에서 직접 실행이 시작되고(이전 RDD에서 디스크에 데이터를 쓰는 과정 생략) 디스크에서 직접 읽을 수 있습니다. .

5) **한 단계에서는RDD는여러 파티션을 사용하면 여러 병렬 작업이 수행됩니다.

4. kv 연산자는 kv의 RDD에만 작용할 수 있습니다.
5. Spark는 데이터가 아닌 이동 계산만 제공하는 최적의 작업 계산 방법을 제공합니다.

스파크의 디자인 원칙 중 하나는데이터 현지화(Data Locality) 즉, 데이터가 위치한 노드에서 컴퓨팅 작업을 실행하도록 하여 데이터의 네트워크 전송 오버헤드를 줄입니다.

Spark 인스턴스: 단어 수
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setMaster
    //将来如果是集群进行,将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
    //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
        println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
        println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
        println(s"resRDD的分区数:${resRDD.getNumPartitions}")

    //打印
    resRDD2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resRDD.saveAsTextFile("spark/data/outdata1")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
Spark에서 RDD가 호출하는 함수를 연산자라고 합니다.

연산자는 두 가지 범주로 나뉩니다.

1. 변환 연산자(RDD -> RDD, 처리 로직)

2. 액션 연산자(작업 실행 트리거)

1. 변환 연산자
1)지도
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
      println("==============处理后的数据========================")
      val array1: Array[String] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2)필터
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
      var b: Boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderRDD.foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3) 플랫맵
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo3FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")

    /**
     * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
      println("===============一条数据====================")
      line.split("\|")
    })

    rdd1.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
4) 샘플
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withReplacement :
     * 为True时,抽样结果中可能会包含重复的元素。
     * 为False时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5) 그룹별
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5GroupBy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupBy")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))

    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * groupBy算子的使用
     *
     * 1、groupBy的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
//    groupRDD.foreach(println)

    val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
      val clazz: String = kv._1
      val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size

      (clazz, avgAge)
    })
    resKvRDD.foreach(println)

//    while (true){
//
//    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

여기에 이미지 설명을 삽입하세요.

6)그룹별키
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo6GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))


    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
     * 也就说,只有kv格式的RDD才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

    val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
    resKvRDD2.foreach(println)

    /**
     * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
     * 1、代码格式上:
     * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
     * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
     *
     * 2、执行shuffle数据量来看
     *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
     *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
     */

    while (true) {

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

여기에 이미지 설명을 삽입하세요.

7) reduceByKey로 줄이기
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo7ReduceByKey {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    //分别使用groupByKey和reduceBykey计算每个学生的总分
    val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(id: String, _, score: String) =>
        (id, score.toInt)
    }

    /**
     * groupByKey实现
     */
//        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
//        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
//        resRDD1.foreach(println)

    /**
     * reduceByKey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    resRDD2.foreach(println)


    /**
     * 面试题:
     * groupByKey与reduceBykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的RDD才能调用
     * 不同点:
     * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
     */

    while (true){

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

여기에 이미지 설명을 삽입하세요.

8)연합
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getNumPartitions}")

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getNumPartitions}")

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
    //    rdd1.union(rdd3)
    val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
    resRDD1.foreach(println)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resRDD1的分区数:2
     */
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

9)가입하다

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * join算子也要作用在kv格式的RDD上
 */
object Demo9Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, like: String)) =>
            (id, name, like)
        }
        resRDD2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), like: String)) =>
            (id, name, like)
          case (id: String, (None, like: String)) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    val resRDD2: RDD[(String, String, String)] = resRDD1.map {
      case (id: String, (name: String, Some(like: String))) =>
        (id, name, like)
      case (id: String, (name: String, None)) =>
        (id, name, "此人无爱好")
    }
    resRDD2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    val resRDD3: RDD[(String, String, String)] = resRDD2.map {
      case (id: String, (Some(name), Some(like))) =>
        (id, name, like)
      case (id: String, (Some(name), None)) =>
        (id, name, "此人无爱好")
      case (id: String, (None, Some(like))) =>
        (id, "查无此人", like)
    }
    resRDD3.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110