Technologieaustausch

Übersicht über Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Übersicht über Spark SQL

Spark SQL Es handelt sich um ein Modul von Apache Spark, das speziell zur Verarbeitung strukturierter Daten verwendet wird. Es integriert die leistungsstarken Funktionen der SQL-Abfrage und der Spark-Programmierung und macht die Verarbeitung großer Datenmengen effizienter und einfacher. Über Spark SQL können Benutzer SQL-Abfragen direkt in Spark verwenden oder die DataFrame- und DataSet-APIs für Datenoperationen verwenden.

1. Spark SQL-Architektur

Die Architektur von Spark SQL besteht hauptsächlich aus folgenden Komponenten:

  1. SparkSession: Ein einheitlicher Einstiegspunkt für Spark-Anwendungen, der zum Erstellen von DataFrames, DataSets und zum Ausführen von SQL-Abfragen verwendet wird.
  2. Katalysatoroptimierer: Die Abfrageoptimierungs-Engine von Spark SQL, verantwortlich für das Parsen, Analysieren, Optimieren und Generieren physischer Ausführungspläne.
  3. DataFrame- und DataSet-API: Bietet eine objektorientierte Programmierschnittstelle und unterstützt umfangreiche Datenoperationsmethoden.
  4. Datenquellenschnittstelle: Unterstützt mehrere Datenquellen wie HDFS, S3, HBase, Cassandra, Hive usw.
  5. Ausführungsmaschine: Optimierte Abfragepläne in Ausführungsaufgaben umwandeln und diese Aufgaben parallel auf einem verteilten Cluster ausführen.

2. Spark SQL-Funktionen

  • Einheitliche Datenzugriffsschnittstelle: Unterstützt mehrere Datenquellen (wie CSV, JSON, Parquet, Hive, JDBC, HBase usw.) und bietet eine konsistente Abfrageschnittstelle.
  • DataFrame- und Dataset-API: Bietet eine objektorientierte Programmierschnittstelle, unterstützt typsichere Vorgänge und erleichtert die Datenverarbeitung.
  • Katalysatoroptimierer: Benutzerabfragen automatisch in effiziente Ausführungspläne umwandeln, um die Abfrageleistung zu verbessern.
  • Integration mit Hive: Integriert Hive nahtlos, kann direkt auf vorhandene Hive-Daten zugreifen und UDF und UDAF von Hive verwenden.
  • Hochleistung: Erzielen Sie eine effiziente Abfrageleistung und Speicherverwaltung durch den Catalyst-Optimierer und die Tungsten-Ausführungs-Engine.
  • Verschiedene Operationsmethoden: Unterstützt zwei Betriebsmodi: SQL- und API-Programmierung mit hoher Flexibilität.
  • Externe Tool-Schnittstelle: Stellt eine JDBC/ODBC-Schnittstelle für Tools von Drittanbietern bereit, um Spark für die Datenverarbeitung zu verwenden.
  • Erweiterte Schnittstelle: Bietet eine übergeordnete Schnittstelle zur bequemen Verarbeitung von Daten.

3. Funktionsprinzip von Spark SQL

Fügen Sie hier eine Bildbeschreibung ein

Abfrageparsing: Analysiert SQL-Abfragen in abstrakte Syntaxbäume (AST).

Logische Planerstellung:Konvertieren Sie AST in einen nicht optimierten logischen Plan.

Logische Planoptimierung: Verwenden Sie den Catalyst-Optimierer, um eine Reihe von Regeln für den logischen Plan zu optimieren.

Physische Planerstellung: Konvertieren Sie den optimierten logischen Plan in einen oder mehrere physische Pläne und wählen Sie den optimalen physischen Plan aus.

Ausführung: Konvertieren Sie den physischen Plan in RDD und führen Sie ihn parallel im Cluster aus.

4. Übersicht über die Spark SQL-API

SparkContext : SparkContext ist der Haupteinstiegspunkt der Spark-Anwendung und verantwortlich für die Verbindung zum Spark-Cluster, die Verwaltung von Ressourcen und die Aufgabenplanung. Nach Spark 2.0 wird empfohlen, SparkSession anstelle von SparkContext zu verwenden.

SQLContext : SQLContext ist der Programmiereinstiegspunkt für Spark SQL und ermöglicht Benutzern die Datenverarbeitung über SQL-Abfragen oder die DataFrame-API. Es bietet grundlegende Spark SQL-Funktionalität.

HiveContext: HiveContext ist eine Teilmenge von SQLContext, die integrierte Unterstützung für Hive hinzufügt. Es kann mithilfe von Hives UDF und UDAF direkt auf Daten und Metadaten in Hive zugreifen.

SparkSession : SparkSession ist ein neues Konzept, das in Spark 2.0 eingeführt wurde. Es vereint die Funktionen von SQLContext und HiveContext und bietet eine einheitliche Programmierschnittstelle. SparkSession ist der empfohlene Einstiegspunkt für Spark SQL und unterstützt die Datenverarbeitung mithilfe der DataFrame- und Dataset-APIs.

Hinweise zum Erstellen von SparkContext und SparkSession : Wenn Sie SparkContext und SparkSession gleichzeitig erstellen müssen, müssen Sie zuerst SparkContext und dann SparkSession erstellen. Wenn Sie zuerst SparkSession und dann SparkContext erstellen, tritt eine Ausnahme auf, da nur ein SparkContext in derselben JVM ausgeführt werden kann.

5. Spark SQL-Abhängigkeiten

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Spark SQL-Datensatz

In Spark SQL werden Datensätze hauptsächlich in die folgenden Typen unterteilt: DataFrame und Dataset. Sie sind Kernabstraktionen für die Verarbeitung und Bearbeitung strukturierter und halbstrukturierter Daten.

1. Datenrahmen

Dataset ist eine neue abstrakte Datenstruktur, die in Spark 2.0 eingeführt wurde. Sie ist stark typisiert und kann JVM-Objekte speichern. Die Dataset-API kombiniert die betriebliche Einfachheit und Typsicherheit von DataFrame und eignet sich für Szenarien, die ein höheres Maß an Datentypkontrolle und einen objektorientierten Programmierstil erfordern. Die spezifischen Merkmale sind wie folgt:

  • Ähnlich einer zweidimensionalen Tabelle: DataFrame ähnelt einer zweidimensionalen Tabelle in einer herkömmlichen relationalen Datenbank.
  • Schema (Datenstrukturinformationen): Schema wird auf Basis von RDD hinzugefügt, um die Informationen der Datenstruktur zu beschreiben.
  • Unterstützt verschachtelte Datentypen: Das Schema von DataFrame unterstützt verschachtelte Datentypen, z structmap Undarray
  • Rich SQL-Operations-API: Bietet mehr APIs ähnlich wie SQL-Operationen, um die Datenabfrage und -operation zu erleichtern.

2. Datensatz

Dataset ist eine neue abstrakte Datenstruktur, die in Spark 2.0 eingeführt wurde. Sie ist stark typisiert und kann JVM-Objekte speichern. Die Dataset-API kombiniert die betriebliche Einfachheit und Typsicherheit von DataFrame und eignet sich für Szenarien, die ein höheres Maß an Datentypkontrolle und einen objektorientierten Programmierstil erfordern. Die spezifischen Merkmale sind wie folgt:

  • Stark getippt: Dataset ist eine allgemeinere Datensammlung, die in Spark 1.6 eingeführt wurde. Sie ist stark typisiert und bietet typsichere Operationen.
  • RDD + Schema: Es kann davon ausgegangen werden, dass der Datensatz eine Kombination aus RDD und Schema ist. Er verfügt sowohl über die verteilte Rechenfähigkeit von RDD als auch über die Informationen von Schema, die die Datenstruktur beschreiben.
  • Gilt für domänenspezifische Objekte: Eine stark typisierte Sammlung domänenspezifischer Objekte, die gespeichert und bearbeitet werden können.
  • Parallelbetrieb: Konvertierungen und Operationen können mithilfe von Funktionen oder verwandten Operationen parallel ausgeführt werden.

3. Die Beziehung zwischen DataFrame und Dataset

  • DataFrame ist ein spezieller Datensatz: DataFrame ist ein Sonderfall von Dataset DataFrame = Dataset[Row]
  • Vereinheitlichung der Datenabstraktions- und Betriebsmethoden: DataFrame und Dataset vereinheitlichen die Datenabstraktions- und Betriebsmethoden von Spark SQL und bieten flexible und leistungsstarke Datenverarbeitungsfunktionen.

7. Grundlegende Verwendung von Spark Sql

1. Scala erstellt ein SparkSession-Objekt

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. So erstellen Sie DataFrame und Dataset

1. Aus einer Sammlung erstellen

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Aus dem Dateisystem lesen

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Aus der relationalen Datenbank lesen

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Lesen Sie aus unstrukturierten Datenquellen

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Erstellen Sie den Datensatz manuell

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. DataFrame-API

Grammatikbeispiel eins

Simulationsdaten (1000 Artikel):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Anforderung: Welche Stadt- und Geschlechtskombinationen haben das höchste Durchschnittsalter bei größeren Bevölkerungsgruppen (Anzahl der IDs &gt;50) und die Rangfolge dieser Kombinationen innerhalb ihrer jeweiligen Geschlechter.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

Ergebnis:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Syntaxbeispiel zwei: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Syntaxbeispiel drei: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left links beitreten,rignt richtig beitreten,full vollständiger äußerer Join,antilinker Differenzsatz,semilinke Kreuzung

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Ergebnis

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43