技術共有

ビッグデータ学習のための Spark の基本

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

スパークの基本

簡単な説明

1. Spark ジョブ実行の特徴:

(1) アクション演算子が見つかった場合にのみ、Spark ジョブ全体が実行のためにトリガーされます。

(2) 何度か遭遇し、何度も実行された

2. RDD: 弾力性のある分散データセット

柔軟性: データ量は大きくても小さくても可能です

RDD はコンテナに似ていますが、データではなく計算ロジックを保存します。

アクション演算子が見つかると、最初の RDD から開始して Spark ジョブ全体が実行され始めます。

データは RDD 間で流れる関係にすぎず、保存されません。

流れるデータの量は多い場合も少ない場合もあり、これを弾力性と呼びます

配布:

Spark は基本的に HDFS からデータを読み取る必要があります。HDFS は分散されており、将来的にはデータ ブロックが異なるデータノードに存在する可能性があります。

RDD を流れるデータは、異なるデータノードのブロック データから取得される場合があります。

データセット:

計算フローのプロセス中、RDD は簡単にコンテナと見なすことができます。コンテナ内には、デフォルトではメモリに格納されないデータがあります。

RDD のデータを後でディスクに保存する方法が用意される予定です。

RDD の 5 つの主要な特徴: (面接で必ず尋ねてください!)
1. RDD は一連のパーティションで構成されます

1) ファイル読み取り時の minPartitions パラメーターは、最小パーティション数のみを決定します。ファイル読み取り後の実際の RDD パーティション数は、データの内容自体とクラスターの分布によって決まります。

2) 設定された minPartitions の数がブロックの数よりも小さい場合、パーティションの数は実際にはブロックの数によって決まります。

3) シャッフルを生成するオペレーターを呼び出すときに、numPartitions (例: groupby()) を渡すことができます。これにより、RDD のパーティション数が実際に変更され、設定されたパーティションの数によって、最終的な RDD のパーティション数が決まります。持っている。

4) ファイルはブロック形式で HDFS に保存されます。ファイルがデフォルト値の 128M に達しない場合も、ブロックに保存されます。

最初に、RDD 内のパーティションの数は、データを読み取るブロックの数によって決まります。

後段の RDD のパーティションデータは、KV 関数を除き、前段の RDD のパーティションデータを論理処理した結果に相当します。デフォルトでは、後続のパーティションが処理されない場合、後続のすべての RDD のパーティション数は最初の RDD に依存します。

最終的な RDD にはいくつかのパーティションがあり、将来 HDFS にいくつかの結果ファイルが表示されることになります (HDFS -> RDD -> HDFS)。

2. オペレーターは各パーティションに作用します (各パーティションが処理されます)
3. RDD と RDD の間にはいくつかの依存関係があります。

1) 1 対 1 の関係に狭く依存しており、前の RDD の特定のパーティションのデータは次の RDD の一意のパーティションにのみ送信されます (または、前の複数のパーティションが次のパーティションに送信される可能性があります)。

2) 前の RDD に大きく依存する、前の RDD 内の特定のパーティションの 1 対多の関係が、次の RDD で異なるパーティションに入る場合、シャッフルが生成されるかどうかを確認することでも判断できます。

3) Spark ジョブ全体は、幅広い依存関係の数に応じていくつかのステージに分割されます。Num(ステージ) = Num(幅広い依存関係) + 1

4) シャッフルを生成するオペレーターに遭遇すると、前のRDDからディスクにデータを書き込み、ディスクから次のRDDにデータを読み込む現象。

注: 初めて実行がトリガーされたときは、ディスク上にデータがないため、実行は最初に生成された RDD から開始されます。

同じ実行が繰り返しトリガーされると、同じ DAG 有向非巡回グラフの場合、シャッフル後の RDD から直接実行が開始され (前の RDD からディスクにデータを書き込むプロセスが省略され)、ディスクから直接読み取ることができます。 。 データ。

5) **ある段階では、RDDには複数のパーティションがある場合、複数の並列タスクが存在します。

4. kv オペレーターは、kv の RDD に対してのみ動作できます。
5. Spark は最適なタスク計算方法を提供します。移動計算のみであり、データは提供されません。

Spark の設計原則の 1 つは次のとおりです。データのローカリゼーション(データ局所性)、つまり、データが配置されているノード上でコンピューティング タスクを実行するように試み、それによってデータのネットワーク送信オーバーヘッドを削減します。

Spark インスタンス: ワードカウント
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setMaster
    //将来如果是集群进行,将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
    //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
        println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
        println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
        println(s"resRDD的分区数:${resRDD.getNumPartitions}")

    //打印
    resRDD2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resRDD.saveAsTextFile("spark/data/outdata1")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
Spark の RDD によって呼び出される関数は演算子と呼ばれます

演算子は 2 つのカテゴリに分類されます。

1. 変換演算子(RDD -> RDD、処理ロジック)

2. アクション演算子 (ジョブの実行をトリガー)

1. 変換演算子
1)地図
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
      println("==============处理后的数据========================")
      val array1: Array[String] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2)フィルター
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
      var b: Boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderRDD.foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3)フラットマップ
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo3FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")

    /**
     * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
      println("===============一条数据====================")
      line.split("\|")
    })

    rdd1.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
4)サンプル
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withReplacement :
     * 为True时,抽样结果中可能会包含重复的元素。
     * 为False时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5)グループ化
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5GroupBy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupBy")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))

    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * groupBy算子的使用
     *
     * 1、groupBy的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
//    groupRDD.foreach(println)

    val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
      val clazz: String = kv._1
      val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size

      (clazz, avgAge)
    })
    resKvRDD.foreach(println)

//    while (true){
//
//    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

ここに画像の説明を挿入します

6)グループ化
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo6GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))


    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
     * 也就说,只有kv格式的RDD才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

    val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
    resKvRDD2.foreach(println)

    /**
     * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
     * 1、代码格式上:
     * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
     * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
     *
     * 2、执行shuffle数据量来看
     *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
     *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
     */

    while (true) {

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

ここに画像の説明を挿入します

7)キーによる削減
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo7ReduceByKey {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    //分别使用groupByKey和reduceBykey计算每个学生的总分
    val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(id: String, _, score: String) =>
        (id, score.toInt)
    }

    /**
     * groupByKey实现
     */
//        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
//        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
//        resRDD1.foreach(println)

    /**
     * reduceByKey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    resRDD2.foreach(println)


    /**
     * 面试题:
     * groupByKey与reduceBykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的RDD才能调用
     * 不同点:
     * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
     */

    while (true){

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

ここに画像の説明を挿入します

8)組合
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getNumPartitions}")

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getNumPartitions}")

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
    //    rdd1.union(rdd3)
    val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
    resRDD1.foreach(println)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resRDD1的分区数:2
     */
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

9)参加する

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * join算子也要作用在kv格式的RDD上
 */
object Demo9Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, like: String)) =>
            (id, name, like)
        }
        resRDD2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), like: String)) =>
            (id, name, like)
          case (id: String, (None, like: String)) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    val resRDD2: RDD[(String, String, String)] = resRDD1.map {
      case (id: String, (name: String, Some(like: String))) =>
        (id, name, like)
      case (id: String, (name: String, None)) =>
        (id, name, "此人无爱好")
    }
    resRDD2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    val resRDD3: RDD[(String, String, String)] = resRDD2.map {
      case (id: String, (Some(name), Some(like))) =>
        (id, name, like)
      case (id: String, (Some(name), None)) =>
        (id, name, "此人无爱好")
      case (id: String, (None, Some(like))) =>
        (id, "查无此人", like)
    }
    resRDD3.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110