Partage de technologie

Principes de base de Spark pour l'apprentissage du Big Data

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Les bases de Spark

Brève description

1. Caractéristiques de l'exécution du travail Spark :

(1) Ce n'est que lorsqu'un opérateur d'action est rencontré que l'intégralité de la tâche Spark sera déclenchée pour exécution.

(2) Rencontré plusieurs fois, exécuté plusieurs fois

2. RDD : ensemble de données distribuées élastiques

Flexibilité : la quantité de données peut être grande ou petite

RDD est similaire à un conteneur, mais il ne stocke pas de données, mais une logique de calcul.

Lorsqu'un opérateur d'action est rencontré, l'intégralité du travail Spark sera déclenchée pour s'exécuter à partir du premier RDD, les données commenceront à circuler.

Les données ne constituent qu'une relation fluide entre les RDD et ne seront pas stockées.

La quantité de données circulant peut être grande ou petite, c'est ce qu'on appelle l'élasticité

distribué:

Spark a essentiellement besoin de lire les données de HDFS. HDFS est distribué et les blocs de données pourraient se trouver sur différents nœuds de données à l'avenir.

Les données circulant dans RDD peuvent provenir de blocs de données dans différents nœuds de données.

base de données:

Pendant le processus de flux de calcul, RDD peut être brièvement considéré comme un conteneur. Il contient des données qui ne sont pas stockées en mémoire par défaut.

Il y aura un moyen de stocker les données d'un RDD sur le disque plus tard.

Cinq caractéristiques majeures du RDD : (à demander lors des entretiens !)
1. RDD est composé d'une série de partitions

1) Le paramètre minPartitions lors de la lecture d'un fichier ne peut déterminer que le nombre minimum de partitions. Le nombre réel de partitions RDD après la lecture du fichier est déterminé par le contenu des données lui-même et la distribution du cluster.

2) Si le nombre de minPartitions défini est inférieur au nombre de blocs, le nombre de partitions est en fait déterminé par le nombre de blocs.

3) Lorsque vous appelez l'opérateur qui génère le shuffle, vous pouvez transmettre numPartitions (par exemple : groupby()), ce qui peut en fait modifier le nombre de partitions dans le RDD. Le nombre de partitions que vous définissez déterminera le nombre de partitions du RDD final. avoir.

4) Le fichier sera stocké sur HDFS sous forme de blocs. Si le fichier n'atteint pas la valeur par défaut de 128M, il sera également stocké dans un bloc.

Initialement, le nombre de partitions dans le RDD est déterminé par le nombre de blocs lisant les données.

Les données de partition dans ce dernier RDD, à l'exception de la fonction KV, correspondent au résultat du traitement logique des données de partition dans le RDD précédent. Par défaut, si les partitions suivantes ne sont pas traitées, le nombre de partitions de tous les RDD suivants dépend du premier RDD.

Il y a plusieurs partitions dans le RDD final, et vous verrez plusieurs fichiers de résultats dans HDFS à l'avenir (HDFS -> RDD -> HDFS)

2. L'opérateur agit sur chaque partition (chaque partition sera traitée)
3. Il existe certaines dépendances entre RDD et RDD

1) S'appuie étroitement sur une relation un-à-un dans laquelle les données d'une certaine partition du RDD précédent n'iront que vers une partition unique du prochain RDD (ou plusieurs partitions précédentes peuvent aller vers la partition suivante)

2) Une relation un-à-plusieurs avec une large dépendance à l'égard d'une certaine partition dans le RDD précédent entrera dans différentes partitions dans le RDD ultérieur. Vous pouvez également juger en vérifiant si un mélange est généré.

3) L'ensemble du travail Spark sera divisé en plusieurs étapes par le nombre de dépendances larges, Num(stage) = Num(wide dependances) + 1

4) Lorsque vous rencontrez l'opérateur qui génère le shuffle, cela impliquePhénomène d'écriture des données du RDD précédent sur le disque et de lecture des données du disque vers le RDD suivant.

Remarque : Lorsque l'exécution est déclenchée pour la première fois, il n'y a aucune donnée sur le disque, l'exécution démarrera donc à partir du premier RDD généré.

Lorsque la même exécution est déclenchée à plusieurs reprises, pour le même graphe acyclique dirigé par DAG, l'exécution démarrera directement à partir du RDD après la lecture aléatoire (en omettant le processus d'écriture des données sur le disque à partir du RDD précédent) et pourra être lue directement à partir du disque. données.

5) **En une seule étape,RDD aAvec plusieurs partitions, il y aura plusieurs tâches parallèles.

4. L'opérateur kv ne peut agir que sur le RDD du kv.
5. Spark fournira la méthode de calcul de tâche optimale, en déplaçant uniquement les calculs mais pas les données.

L'un des principes de conception de Spark estlocalisation des données(Data Locality), c'est-à-dire essayer de laisser les tâches informatiques être exécutées sur le nœud où se trouvent les données, réduisant ainsi la surcharge de transmission des données sur le réseau.

Instance Spark : nombre de mots
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setMaster
    //将来如果是集群进行,将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
    //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
        println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
        println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
        println(s"resRDD的分区数:${resRDD.getNumPartitions}")

    //打印
    resRDD2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resRDD.saveAsTextFile("spark/data/outdata1")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
Les fonctions appelées par RDD dans Spark sont appelées opérateurs

Les opérateurs sont divisés en deux catégories :

1. Opérateur de conversion (RDD -> RDD, logique de traitement)

2. Opérateur d'action (déclenche l'exécution du travail)

1. Opérateur de conversion
1) Carte
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
      println("==============处理后的数据========================")
      val array1: Array[String] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2) filtre
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
      var b: Boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderRDD.foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3) Carte plate
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo3FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")

    /**
     * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
      println("===============一条数据====================")
      line.split("\|")
    })

    rdd1.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
4) échantillon
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withReplacement :
     * 为True时,抽样结果中可能会包含重复的元素。
     * 为False时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5)GroupePar
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5GroupBy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupBy")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))

    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * groupBy算子的使用
     *
     * 1、groupBy的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
//    groupRDD.foreach(println)

    val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
      val clazz: String = kv._1
      val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size

      (clazz, avgAge)
    })
    resKvRDD.foreach(println)

//    while (true){
//
//    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

Insérer la description de l'image ici

6)groupByKey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo6GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))


    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
     * 也就说,只有kv格式的RDD才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

    val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
    resKvRDD2.foreach(println)

    /**
     * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
     * 1、代码格式上:
     * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
     * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
     *
     * 2、执行shuffle数据量来看
     *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
     *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
     */

    while (true) {

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Insérer la description de l'image ici

7) réduire par clé
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo7ReduceByKey {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    //分别使用groupByKey和reduceBykey计算每个学生的总分
    val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(id: String, _, score: String) =>
        (id, score.toInt)
    }

    /**
     * groupByKey实现
     */
//        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
//        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
//        resRDD1.foreach(println)

    /**
     * reduceByKey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    resRDD2.foreach(println)


    /**
     * 面试题:
     * groupByKey与reduceBykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的RDD才能调用
     * 不同点:
     * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
     */

    while (true){

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

Insérer la description de l'image ici

8) syndicat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getNumPartitions}")

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getNumPartitions}")

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
    //    rdd1.union(rdd3)
    val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
    resRDD1.foreach(println)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resRDD1的分区数:2
     */
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

9) rejoindre

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * join算子也要作用在kv格式的RDD上
 */
object Demo9Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, like: String)) =>
            (id, name, like)
        }
        resRDD2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), like: String)) =>
            (id, name, like)
          case (id: String, (None, like: String)) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    val resRDD2: RDD[(String, String, String)] = resRDD1.map {
      case (id: String, (name: String, Some(like: String))) =>
        (id, name, like)
      case (id: String, (name: String, None)) =>
        (id, name, "此人无爱好")
    }
    resRDD2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    val resRDD3: RDD[(String, String, String)] = resRDD2.map {
      case (id: String, (Some(name), Some(like))) =>
        (id, name, like)
      case (id: String, (Some(name), None)) =>
        (id, name, "此人无爱好")
      case (id: String, (None, Some(like))) =>
        (id, "查无此人", like)
    }
    resRDD3.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110