Compartir tecnología

Conceptos básicos de Spark para el aprendizaje de Big Data

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Conceptos básicos de chispa

Breve descripción

1. Características de la ejecución del trabajo Spark:

(1) Solo cuando se encuentre un operador de acción, se activará todo el trabajo de Spark para su ejecución.

(2) Encontrado varias veces, ejecutado varias veces

2. RDD: conjunto de datos distribuidos elásticos

Flexibilidad: la cantidad de datos puede ser grande o pequeña

RDD es similar a un contenedor, pero no almacena datos, sino lógica de cálculo.

Cuando se encuentra un operador de acción, se activará la ejecución de todo el trabajo de Spark. A partir del primer RDD, los datos comenzarán a fluir.

Los datos son solo una relación fluida entre RDD y no se almacenarán.

La cantidad de datos que fluyen puede ser grande o pequeña, por eso se llama elasticidad.

repartido:

Spark esencialmente necesita leer datos de HDFS. HDFS está distribuido y los bloques de datos pueden estar en diferentes nodos de datos en el futuro.

Los datos que fluyen en RDD pueden provenir de datos de bloques en diferentes nodos de datos.

conjunto de datos:

Durante el proceso de flujo de cálculo, RDD se puede considerar brevemente como un contenedor. Hay datos en el contenedor que no se almacenan en la memoria de forma predeterminada.

Más adelante habrá una forma de almacenar los datos de un RDD en el disco.

Cinco características principales de RDD: (¡debe preguntar en las entrevistas!)
1. RDD se compone de una serie de particiones.

1) El parámetro minPartitions al leer un archivo solo puede determinar el número mínimo de particiones. El número real de particiones RDD después de leer el archivo está determinado por el contenido de los datos en sí y la distribución del clúster.

2) Si el número de minPartitions establecido es menor que el número de bloques, el número de particiones en realidad está determinado por el número de bloques.

3) Al llamar al operador que genera reproducción aleatoria, puede pasar numPartitions (por ejemplo: groupby()), lo que realmente puede cambiar el número de particiones del RDD. La cantidad de particiones configuradas determinará cuántas particiones tendrá el RDD final. tener.

4) El archivo se almacenará en HDFS en forma de bloques. Si el archivo no alcanza el valor predeterminado de 128 M, también se almacenará en un bloque.

Inicialmente, la cantidad de particiones en el RDD está determinada por la cantidad de bloques que leen datos.

Los datos de la partición en el último RDD, excepto la función KV, corresponden al resultado del procesamiento lógico de los datos de la partición en el RDD anterior. De forma predeterminada, si no se procesan las particiones posteriores, el número de particiones de todos los RDD posteriores depende del primer RDD.

Hay varias particiones en el RDD final y verá varios archivos de resultados en HDFS en el futuro (HDFS -> RDD -> HDFS)

2. El operador actúa en cada partición (se procesará cada partición)
3. Existen algunas dependencias entre RDD y RDD

1) Se basa estrictamente en una relación uno a uno donde los datos de una determinada partición en el RDD anterior solo irán a una partición única en el siguiente RDD (o varias particiones anteriores pueden ir a la siguiente partición)

2) La relación de uno a muchos de una determinada partición en el RDD anterior que depende en gran medida del RDD anterior ingresará a diferentes particiones en el siguiente RDD. También puede juzgar si se genera una mezcla.

3) Todo el trabajo de Spark se dividirá en varias etapas por el número de dependencias amplias, Num(etapa) = Num(dependencias amplias) + 1

4) Cuando se encuentra con el operador que genera reproducción aleatoria, implicaEl fenómeno de escribir datos del RDD anterior en el disco y leer datos del disco en el siguiente RDD.

Nota: Cuando se activa la ejecución por primera vez, no hay datos en el disco, por lo que la ejecución comenzará desde el primer RDD generado.

Cuando la misma ejecución se activa repetidamente, para el mismo gráfico acíclico dirigido por DAG, la ejecución comenzará directamente desde el RDD después de la reproducción aleatoria (omitiendo el proceso de escribir datos en el disco desde el RDD anterior) y se puede leer directamente desde el disco. . datos.

5) **En una etapa,RDD tieneCon varias particiones, habrá varias tareas paralelas.

4. El operador de kv solo puede actuar sobre el RDD de kv.
5. Spark proporcionará el método óptimo de cálculo de tareas, solo moviendo cálculos pero no datos.

Uno de los principios de diseño de Spark eslocalización de datos(Localidad de datos), es decir, intentar que las tareas informáticas se ejecuten en el nodo donde se encuentran los datos, reduciendo así la sobrecarga de transmisión de datos de la red.

Instancia de Spark: recuento de palabras
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setMaster
    //将来如果是集群进行,将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
    //    println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
        println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
        println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
        println(s"resRDD的分区数:${resRDD.getNumPartitions}")

    //打印
    resRDD2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resRDD.saveAsTextFile("spark/data/outdata1")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
Las funciones llamadas por RDD en Spark se llaman operadores

Los operadores se dividen en dos categorías:

1. Operador de conversión (RDD -> RDD, lógica de procesamiento)

2. Operador de acción (activa la ejecución del trabajo)

1. Operador de conversión
1) Mapa
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
      println("==============处理后的数据========================")
      val array1: Array[String] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2) filtro
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
      var b: Boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderRDD.foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3) Mapa plano
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo3FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")

    /**
     * flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
      println("===============一条数据====================")
      line.split("\|")
    })

    rdd1.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
4) muestra
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val sc: SparkContext = new SparkContext(conf)

    //===============================================================
    val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withReplacement :
     * 为True时,抽样结果中可能会包含重复的元素。
     * 为False时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5) Agrupar por
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5GroupBy {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupBy")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))

    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * groupBy算子的使用
     *
     * 1、groupBy的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
//    groupRDD.foreach(println)

    val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
      val clazz: String = kv._1
      val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size

      (clazz, avgAge)
    })
    resKvRDD.foreach(println)

//    while (true){
//
//    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

Insertar descripción de la imagen aquí

6) Agrupar por clave
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo6GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("groupByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))


    //像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
    val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(_, _, age: String, _, clazz: String) =>
        (clazz, age.toInt)
    }

    /**
     * GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
     * 也就说,只有kv格式的RDD才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()

    val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
    resKvRDD2.foreach(println)

    /**
     * 面试题:spark core中 groupBy算子与groupByKey算子的区别?
     * 1、代码格式上:
     * groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
     * groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
     *
     * 2、执行shuffle数据量来看
     *  groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
     *  所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
     */

    while (true) {

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Insertar descripción de la imagen aquí

7) reducir por clave
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo7ReduceByKey {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
    //分别使用groupByKey和reduceBykey计算每个学生的总分
    val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
      case Array(id: String, _, score: String) =>
        (id, score.toInt)
    }

    /**
     * groupByKey实现
     */
//        val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
//        val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
//        resRDD1.foreach(println)

    /**
     * reduceByKey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    resRDD2.foreach(println)


    /**
     * 面试题:
     * groupByKey与reduceBykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的RDD才能调用
     * 不同点:
     * 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
     */

    while (true){

    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

Insertar descripción de la imagen aquí

8) Unión
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo8Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getNumPartitions}")

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getNumPartitions}")

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
    //    rdd1.union(rdd3)
    val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
    resRDD1.foreach(println)
    println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resRDD1的分区数:2
     */
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

9) unirse

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * join算子也要作用在kv格式的RDD上
 */
object Demo9Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("reduceByKey")

    val sc: SparkContext = new SparkContext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: RDD[(String, String)] = sc.parallelize(List(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
        val resRDD2: RDD[(String, String, String)] = resRDD1.map {
          case (id: String, (name: String, like: String)) =>
            (id, name, like)
        }
        resRDD2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val resRDD3: RDD[(String, String, String)] = resRDD2.map {
          case (id: String, (Some(name), like: String)) =>
            (id, name, like)
          case (id: String, (None, like: String)) =>
            (id, "查无此人", like)
        }
        resRDD3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    val resRDD2: RDD[(String, String, String)] = resRDD1.map {
      case (id: String, (name: String, Some(like: String))) =>
        (id, name, like)
      case (id: String, (name: String, None)) =>
        (id, name, "此人无爱好")
    }
    resRDD2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    val resRDD3: RDD[(String, String, String)] = resRDD2.map {
      case (id: String, (Some(name), Some(like))) =>
        (id, name, like)
      case (id: String, (Some(name), None)) =>
        (id, name, "此人无爱好")
      case (id: String, (None, Some(like))) =>
        (id, "查无此人", like)
    }
    resRDD3.foreach(println)


  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110