Technology Sharing

Spark Basics of Big Data Learning

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Spark Basics

Brief description

1. Characteristics of spark job execution:

(1) The entire Spark job will be triggered and executed only when an action operator is encountered

(2) Execute the operation several times when the problem occurs

2. RDD: Resilient Distributed Dataset

Elasticity: The amount of data can be large or small

RDD is similar to a container, but it does not store data, but computing logic.

When an action operator is encountered, the entire spark job will be triggered to execute, starting from the first RDD, and data will begin to flow.

Data only flows between RDDs and is not stored.

The amount of data flowing can be large or small, so it is called elastic

distributed:

Spark essentially needs to read data from HDFS. HDFS is distributed, and data blocks may be on different datanodes in the future.

The data flowing in RDD may come from block data in different datanodes.

data set:

During the computation process, RDD can be briefly viewed as a container that contains data, which is not stored in memory by default.

There will be a way to store an RDD's data to disk later.

The five major characteristics of RDD: (must ask in the interview!)
1. RDD is composed of a series of partitions

1) The minPartitions parameter when reading a file can only determine the minimum number of partitions. The number of RDD partitions after reading the file is actually determined by the data content itself and the distribution of the cluster.

2) If the number of minPartitions is set to be less than the number of blocks, the number of partitions is actually determined by the number of blocks.

3) When calling the operator that generates the shuffle, you can pass in numPartitions (for example, groupby()), which can actually change the number of partitions of the RDD. The number of partitions in the final RDD will be determined by the number of partitions you set.

4) Files are stored in HDFS in the form of blocks. If the file size does not reach the default value of 128M, it will also be stored in one block.

Initially, the number of partitions in the RDD is determined by the number of blocks used to read data.

The partition data in the subsequent RDD, except for the KV function, corresponds to the result of the logical processing of the partition data in the previous RDD. By default, if the subsequent partitions are not processed, the number of partitions of all subsequent RDDs depends on the first RDD.

Finally, there are several partitions in the RDD, and in the future you will see several result files in HDFS (HDFS -> RDD -> HDFS)

2. The operator acts on each partition (each partition will be processed)
3. There are some dependencies between RDDs

1) Narrow dependency: data from a partition in the previous RDD will only be transferred to a unique partition in the next RDD one-to-one (or multiple partitions in the previous RDD may be transferred to the next partition)

2) Wide dependency: Data from a partition in the previous RDD will be included in different partitions in the next RDD. One-to-many relationships can also be determined by checking whether shuffle occurs.

3) The entire spark job will be divided into several stages according to the number of wide dependencies, Num(stage) = Num(wide dependencies) + 1

4) When encountering an operator that generates shuffle, it involvesThe phenomenon of writing data from the previous RDD to disk and reading data from the disk to the next RDD.

Note: When the execution is triggered for the first time, there is no data on the disk, so the execution will start from the first RDD generated.

When the same execution is triggered repeatedly, for the same DAG directed acyclic graph, execution will start directly from the RDD after shuffle (omitting the process of writing data from the previous RDD to the disk), and data can be read directly from the disk.

5) **In one stage,RDD hasSeveral partitions, ** there will be several parallel tasks

4. The kv operator can only be used on the RDD of kv
5. Spark will provide the optimal task computing method, which only moves the calculation but not the data.

One of the design principles of Spark isData localization(Data Locality), that is, try to execute computing tasks on the nodes where the data is located, thereby reducing the network transmission overhead of the data.

Spark Example: wordcount
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setMaster
    //将来如果是集群进行,将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
    //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
        println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
        println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
        println(s"resRDD的分区数:${resRDD.getNumPartitions}")

    //打印
    resRDD2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resRDD.saveAsTextFile("spark/data/outdata1")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
The function called by RDD in Spark is called operator

Operators are divided into two categories:

1. Conversion operator (RDD -> RDD, processing logic)

2. Action operator (triggering job execution)

1. Conversion Operator
1)Map
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
      println("==============处理后的数据========================")
      val array1: Array[String] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2)filter
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
      var b: Boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderRDD.foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3)flatMap
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo3FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")

    /**
     * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
      println("===============一条数据====================")
      line.split("\|")
    })

    rdd1.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
4)sample
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withReplacement :
     * 为True时,抽样结果中可能会包含重复的元素。
     * 为False时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5)groupBy
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5GroupBy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupBy")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))

    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * groupBy算子的使用
     *
     * 1、groupBy的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
//    groupRDD.foreach(println)

    val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
      val clazz: String = kv._1
      val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size

      (clazz, avgAge)
    })
    resKvRDD.foreach(println)

//    while (true){
//
//    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

insert image description here

6)groupByKey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo6GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))


    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
     * 也就说,只有kv格式的RDD才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

    val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
    resKvRDD2.foreach(println)

    /**
     * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
     * 1、代码格式上:
     * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
     * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
     *
     * 2、执行shuffle数据量来看
     *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
     *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
     */

    while (true) {

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

insert image description here

7)reduceByKey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo7ReduceByKey {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    //分别使用groupByKey和reduceBykey计算每个学生的总分
    val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(id: String, _, score: String) =>
        (id, score.toInt)
    }

    /**
     * groupByKey实现
     */
//        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
//        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
//        resRDD1.foreach(println)

    /**
     * reduceByKey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    resRDD2.foreach(println)


    /**
     * 面试题:
     * groupByKey与reduceBykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的RDD才能调用
     * 不同点:
     * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
     */

    while (true){

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

insert image description here

8)union
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getNumPartitions}")

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getNumPartitions}")

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
    //    rdd1.union(rdd3)
    val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
    resRDD1.foreach(println)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resRDD1的分区数:2
     */
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

9)join

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * join算子也要作用在kv格式的RDD上
 */
object Demo9Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, like: String)) =>
            (id, name, like)
        }
        resRDD2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), like: String)) =>
            (id, name, like)
          case (id: String, (None, like: String)) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    val resRDD2: RDD[(String, String, String)] = resRDD1.map {
      case (id: String, (name: String, Some(like: String))) =>
        (id, name, like)
      case (id: String, (name: String, None)) =>
        (id, name, "此人无爱好")
    }
    resRDD2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    val resRDD3: RDD[(String, String, String)] = resRDD2.map {
      case (id: String, (Some(name), Some(like))) =>
        (id, name, like)
      case (id: String, (Some(name), None)) =>
        (id, name, "此人无爱好")
      case (id: String, (None, Some(like))) =>
        (id, "查无此人", like)
    }
    resRDD3.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110