Partage de technologie

Présentation de Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Présentation de Spark SQL

Spark SQL Il s'agit d'un module d'Apache Spark spécifiquement utilisé pour traiter des données structurées. Il intègre les puissantes fonctions de requête SQL et de programmation Spark, rendant le traitement du Big Data plus efficace et plus facile. Grâce à Spark SQL, les utilisateurs peuvent utiliser des requêtes SQL directement dans Spark ou utiliser les API DataFrame et DataSet pour les opérations sur les données.

1. Architecture SparkSQL

L'architecture de Spark SQL se compose principalement des composants suivants :

  1. SparkSession: Un point d'entrée unifié pour les applications Spark, utilisé pour créer des DataFrames, des DataSets et exécuter des requêtes SQL.
  2. Optimiseur de catalyseur: Le moteur d'optimisation des requêtes de Spark SQL, chargé d'analyser, d'analyser, d'optimiser et de générer des plans d'exécution physique.
  3. API DataFrame et DataSet: Fournit une interface de programmation orientée objet et prend en charge des méthodes d'exploitation de données riches.
  4. Interface de source de données: Prend en charge plusieurs sources de données, telles que HDFS, S3, HBase, Cassandra, Hive, etc.
  5. moteur d'exécution: Convertissez les plans de requêtes optimisés en tâches d'exécution et exécutez ces tâches en parallèle sur un cluster distribué.

2. Fonctionnalités Spark SQL

  • Interface d'accès aux données unifiée: Prend en charge plusieurs sources de données (telles que CSV, JSON, Parquet, Hive, JDBC, HBase, etc.) et fournit une interface de requête cohérente.
  • API DataFrame et ensemble de données: Fournit une interface de programmation orientée objet, prend en charge les opérations de type sécurisé et facilite le traitement des données.
  • Optimiseur de catalyseur: convertissez automatiquement les requêtes des utilisateurs en plans d'exécution efficaces pour améliorer les performances des requêtes.
  • Intégration avec Hive: intègre de manière transparente Hive, peut accéder directement aux données Hive existantes et utiliser l'UDF et l'UDAF de Hive.
  • haute performance: Obtenez des performances de requêtes et une gestion de la mémoire efficaces grâce à l'optimiseur Catalyst et au moteur d'exécution Tungsten.
  • Diverses méthodes de fonctionnement: Prend en charge deux modes de fonctionnement : programmation SQL et API, avec une grande flexibilité.
  • Interface d'outil externe: Fournit une interface JDBC/ODBC pour les outils tiers permettant d'utiliser Spark pour le traitement des données.
  • Interface avancée: Fournit une interface de niveau supérieur pour traiter les données de manière pratique.

3. Principe de fonctionnement de Spark SQL

Insérer la description de l'image ici

Analyse des requêtes : analyse les requêtes SQL dans des arbres de syntaxe abstraite (AST).

Génération de plan logique : Convertissez AST en plan logique non optimisé.

Optimisation du plan logique: utilisez l'optimiseur Catalyst pour optimiser une série de règles sur le plan logique.

Génération de plans physiques: Convertissez le plan logique optimisé en un ou plusieurs plans physiques et sélectionnez le plan physique optimal.

Exécution: Convertissez le plan physique en RDD et exécutez-le en parallèle sur le cluster.

4. Présentation de l'API Spark SQL

SparkContext : SparkContext est le point d'entrée principal de l'application Spark et est responsable de la connexion au cluster Spark, de la gestion des ressources et de la planification des tâches. Après Spark 2.0, il est recommandé d'utiliser SparkSession au lieu de SparkContext.

Contexte SQL : SQLContext est le point d'entrée de programmation pour Spark SQL, permettant aux utilisateurs d'effectuer un traitement de données via des requêtes SQL ou l'API DataFrame. Il fournit les fonctionnalités de base de Spark SQL.

Contexte de la ruche: HiveContext est un sous-ensemble de SQLContext, qui ajoute la prise en charge intégrée de Hive. Il peut accéder directement aux données et métadonnées dans Hive, à l'aide de l'UDF et de l'UDAF de Hive.

SparkSession : SparkSession est un nouveau concept introduit dans Spark 2.0. Il fusionne les fonctions de SQLContext et HiveContext et fournit une interface de programmation unifiée. SparkSession est le point d'entrée recommandé pour Spark SQL, prenant en charge le traitement des données à l'aide des API DataFrame et Dataset.

Notes sur la création de SparkContext et SparkSession : Si vous devez créer SparkContext et SparkSession en même temps, vous devez d'abord créer SparkContext, puis créer SparkSession. Si vous créez d'abord SparkSession, puis créez SparkContext, une exception se produira car un seul SparkContext peut s'exécuter dans la même JVM.

5. Dépendances Spark SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Ensemble de données Spark SQL

Dans Spark SQL, les ensembles de données sont principalement divisés dans les types suivants : DataFrame et Dataset. Ce sont des abstractions de base pour le traitement et la manipulation de données structurées et semi-structurées.

1. Trame de données

Dataset est une nouvelle structure de données abstraite introduite dans Spark 2.0. Elle est fortement typée et peut stocker des objets JVM. L'API Dataset combine la simplicité opérationnelle et la sécurité des types de DataFrame et convient aux scénarios qui nécessitent un niveau plus élevé de contrôle des types de données et un style de programmation orienté objet. Les spécificités sont les suivantes :

  • Semblable à un tableau à deux dimensions: DataFrame est similaire à une table bidimensionnelle dans une base de données relationnelle traditionnelle.
  • Schéma (informations sur la structure des données): Le schéma est ajouté sur la base de RDD pour décrire les informations de la structure de données.
  • Prise en charge des types de données imbriqués: Le schéma de DataFrame prend en charge les types de données imbriqués, tels que structmap etarray
  • API d'opérations SQL riches: Fournit davantage d'API similaires aux opérations SQL pour faciliter l'interrogation et le fonctionnement des données.

2. Ensemble de données

Dataset est une nouvelle structure de données abstraite introduite dans Spark 2.0. Elle est fortement typée et peut stocker des objets JVM. L'API Dataset combine la simplicité opérationnelle et la sécurité des types de DataFrame et convient aux scénarios qui nécessitent un niveau plus élevé de contrôle des types de données et un style de programmation orienté objet. Les spécificités sont les suivantes :

  • Fortement typé: Collecte de données plus générale introduite dans Spark 1.6, Dataset est fortement typé et fournit des opérations de type sécurisé.
  • RDD + Schéma: On peut considérer que Dataset est une combinaison de RDD et de Schema. Il possède à la fois la capacité de calcul distribué de RDD et les informations de Schema décrivant la structure des données.
  • S'applique aux objets spécifiques au domaine: Une collection fortement typée d'objets spécifiques à un domaine qui peuvent être stockés et manipulés.
  • Fonctionnement parallèle: Les conversions et les opérations peuvent être effectuées en parallèle à l'aide de fonctions ou d'opérations associées.

3. La relation entre DataFrame et Dataset

  • DataFrame est un ensemble de données spécial: DataFrame est un cas particulier de Dataset, c'est-à-dire DataFrame = Dataset[Row]
  • Unification des méthodes d'abstraction et d'exploitation des données: DataFrame et Dataset unifient l'abstraction des données et les méthodes d'exploitation de Spark SQL, offrant des capacités de traitement de données flexibles et puissantes.

7. Utilisation de base de Spark SQL

1. Scala crée un objet SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Comment créer un DataFrame et un Dataset

1. Créer à partir d'une collection

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Lire à partir du système de fichiers

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Lire à partir d'une base de données relationnelle

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Lire à partir de sources de données non structurées

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Créer manuellement un ensemble de données

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. API DataFrame

Premier exemple de grammaire

Données de simulation (1000 éléments) :

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Exigence : quelles combinaisons de villes et de sexes ont l'âge moyen le plus élevé avec des populations plus importantes (nombre de cartes d'identité &gt;50), et le classement de ces combinaisons au sein de leurs sexes respectifs.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

résultat:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Deuxième exemple de syntaxe : view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Troisième exemple de syntaxe : rejoindre

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left joint gauche,rignt rejoindre à droite,full jointure externe complète,antiensemble de différence gauche,semicarrefour à gauche

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

résultat

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43