Condivisione della tecnologia

Panoramica di Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Panoramica di Spark SQL

SQL di Spark È un modulo di Apache Spark utilizzato specificatamente per elaborare dati strutturati. Integra le potenti funzioni delle query SQL e della programmazione Spark, rendendo l'elaborazione dei big data più efficiente e semplice. Tramite Spark SQL, gli utenti possono utilizzare query SQL direttamente in Spark oppure utilizzare le API DataFrame e DataSet per le operazioni sui dati.

1. Architettura Spark SQL

L'architettura di Spark SQL è costituita principalmente dai seguenti componenti:

  1. Sessione di scintilla: un punto di ingresso unificato per le applicazioni Spark, utilizzato per creare dataframe, dataset ed eseguire query SQL.
  2. Ottimizzatore del catalizzatore: il motore di ottimizzazione delle query di Spark SQL, responsabile dell'analisi, dell'analisi, dell'ottimizzazione e della generazione di piani di esecuzione fisica.
  3. API DataFrame e DataSet: Fornisce un'interfaccia di programmazione orientata agli oggetti e supporta metodi operativi ricchi di dati.
  4. Interfaccia dell'origine dati: Supporta più origini dati, come HDFS, S3, HBase, Cassandra, Hive, ecc.
  5. motore di esecuzione: converti piani di query ottimizzati in attività di esecuzione ed esegui queste attività in parallelo su un cluster distribuito.

2. Funzionalità Spark SQL

  • Interfaccia di accesso ai dati unificata: supporta più origini dati (come CSV, JSON, Parquet, Hive, JDBC, HBase, ecc.) e fornisce un'interfaccia di query coerente.
  • API DataFrame e set di dati: Fornisce un'interfaccia di programmazione orientata agli oggetti, supporta operazioni indipendenti dai tipi e facilita l'elaborazione dei dati.
  • Ottimizzatore del catalizzatore: converte automaticamente le query degli utenti in piani di esecuzione efficienti per migliorare le prestazioni delle query.
  • Integrazione con Hive: integra perfettamente Hive, può accedere direttamente ai dati Hive esistenti e utilizzare UDF e UDAF di Hive.
  • alte prestazioni: Ottieni prestazioni di query e gestione della memoria efficienti tramite l'ottimizzatore Catalyst e il motore di esecuzione Tungsten.
  • Vari metodi operativi: Supporta due modalità operative: programmazione SQL e API, con elevata flessibilità.
  • Interfaccia strumento esterno: fornisce l'interfaccia JDBC/ODBC per strumenti di terze parti per utilizzare Spark per l'elaborazione dei dati.
  • Interfaccia avanzata: Fornisce un'interfaccia di livello superiore per elaborare i dati in modo conveniente.

3. Principio di funzionamento di Spark SQL

Inserisci qui la descrizione dell'immagine

Analisi delle query: analizza le query SQL in alberi di sintassi astratti (AST).

Generazione del piano logico:Converti AST in un piano logico non ottimizzato.

Ottimizzazione del piano logico: utilizzare l'ottimizzatore Catalyst per ottimizzare una serie di regole sul piano logico.

Generazione del piano fisico: Converti il ​​piano logico ottimizzato in uno o più piani fisici e seleziona il piano fisico ottimale.

Esecuzione:Convertire il piano fisico in RDD ed eseguirlo in parallelo sul cluster.

4. Panoramica dell'API Spark SQL

Contesto di Spark : SparkContext è il punto di ingresso principale dell'applicazione Spark ed è responsabile della connessione al cluster Spark, della gestione delle risorse e della pianificazione delle attività. Dopo Spark 2.0, è consigliabile utilizzare SparkSession anziché SparkContext.

Contesto SQL : SQLContext è il punto di ingresso della programmazione per Spark SQL, che consente agli utenti di eseguire l'elaborazione dei dati tramite query SQL o API DataFrame. Fornisce funzionalità Spark SQL di base.

Contesto alveare: HiveContext è un sottoinsieme di SQLContext, che aggiunge il supporto integrato per Hive. Può accedere direttamente ai dati e ai metadati in Hive, utilizzando UDF e UDAF di Hive.

Sessione di scintilla : SparkSession è un nuovo concetto introdotto in Spark 2.0 unisce le funzioni di SQLContext e HiveContext e fornisce un'interfaccia di programmazione unificata. SparkSession è il punto di ingresso consigliato per Spark SQL, che supporta l'elaborazione dei dati utilizzando le API DataFrame e Dataset.

Note sulla creazione di SparkContext e SparkSession : se è necessario creare SparkContext e SparkSession contemporaneamente, è necessario creare prima SparkContext e quindi creare SparkSession. Se crei prima SparkSession e poi crei SparkContext, si verificherà un'eccezione perché è possibile eseguire solo un SparkContext nella stessa JVM.

5. Dipendenze Spark SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Set di dati Spark SQL

In Spark SQL, i set di dati sono principalmente suddivisi nei seguenti tipi: DataFrame e Dataset. Sono astrazioni fondamentali per l'elaborazione e la manipolazione di dati strutturati e semi-strutturati.

1、DataFrame

Il set di dati è una nuova struttura di dati astratti introdotta in Spark 2.0. È fortemente tipizzata e può archiviare oggetti JVM. L'API Dataset combina la semplicità operativa e l'indipendenza dai tipi di DataFrame ed è adatta per scenari che richiedono un livello più elevato di controllo del tipo di dati e uno stile di programmazione orientato agli oggetti. Le caratteristiche specifiche sono le seguenti:

  • Simile ad un tavolo bidimensionale: DataFrame è simile a una tabella bidimensionale in un database relazionale tradizionale.
  • Schema (informazioni sulla struttura dei dati): Lo schema viene aggiunto sulla base di RDD per descrivere le informazioni della struttura dei dati.
  • Supporta tipi di dati nidificati: Lo schema di DataFrame supporta tipi di dati annidati, come structmap Earray
  • API operativa SQL avanzata: fornisce più API simili alle operazioni SQL per facilitare l'esecuzione delle query e delle operazioni sui dati.

2. Set di dati

Il set di dati è una nuova struttura di dati astratti introdotta in Spark 2.0. È fortemente tipizzata e può archiviare oggetti JVM. L'API Dataset combina la semplicità operativa e l'indipendenza dai tipi di DataFrame ed è adatta per scenari che richiedono un livello più elevato di controllo del tipo di dati e uno stile di programmazione orientato agli oggetti. Le caratteristiche specifiche sono le seguenti:

  • Fortemente digitato: una raccolta dati più generale introdotta in Spark 1.6, Dataset è fortemente tipizzata e fornisce operazioni indipendenti dai tipi.
  • RDD + Schema: Si può considerare che Dataset sia una combinazione di RDD e Schema ha le capacità di calcolo distribuito di RDD e le informazioni di Schema che descrivono la struttura dei dati.
  • Si applica agli oggetti specifici del dominio: una raccolta fortemente tipizzata di oggetti specifici del dominio che possono essere archiviati e manipolati.
  • Funzionamento parallelo: le conversioni e le operazioni possono essere eseguite in parallelo utilizzando funzioni o operazioni correlate.

3. La relazione tra DataFrame e Dataset

  • DataFrame è un set di dati speciale: DataFrame è un caso speciale di Dataset, cioè DataFrame = Dataset[Row]
  • Unificazione dei metodi di astrazione e funzionamento dei dati: DataFrame e Dataset uniscono l'astrazione dei dati e i metodi operativi di Spark SQL, fornendo funzionalità di elaborazione dei dati flessibili e potenti.

7. Utilizzo di base di Spark Sql

1. Scala crea l'oggetto SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Come creare DataFrame e Dataset

1. Crea da una raccolta

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Leggere dal file system

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Leggi dal database relazionale

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Leggi da origini dati non strutturate

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Creare manualmente il set di dati

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、API DataFrame

Esempio di grammatica uno

Dati di simulazione (1000 articoli):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Requisito: quali combinazioni di città e genere hanno l'età media più alta con una popolazione più numerosa (numero di ID &gt;50) e la classificazione di queste combinazioni all'interno dei rispettivi generi.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

risultato:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Esempio di sintassi due: vista, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Esempio di sintassi tre: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left a sinistra unisciti,rignt giusto unisciti,full unione esterna completa,antiimpostazione della differenza sinistra,semiincrocio a sinistra

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

risultato

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43