Berbagi teknologi

Ikhtisar Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Ikhtisar Spark SQL

percikan SQL Ini adalah modul Apache Spark yang khusus digunakan untuk memproses data terstruktur. Ini mengintegrasikan fungsi-fungsi canggih dari kueri SQL dan pemrograman Spark, menjadikan pemrosesan data besar lebih efisien dan mudah. Melalui Spark SQL, pengguna dapat menggunakan kueri SQL langsung di Spark, atau menggunakan API DataFrame dan DataSet untuk operasi data.

1. Arsitektur Spark SQL

Arsitektur Spark SQL terutama terdiri dari komponen-komponen berikut:

  1. Sesi Percikan: Titik masuk terpadu untuk aplikasi Spark, digunakan untuk membuat DataFrames, DataSets, dan menjalankan kueri SQL.
  2. Pengoptimal katalis: Mesin pengoptimalan kueri Spark SQL, yang bertanggung jawab untuk menguraikan, menganalisis, mengoptimalkan, dan menghasilkan rencana eksekusi fisik.
  3. DataFrame dan API DataSet: Menyediakan antarmuka pemrograman berorientasi objek dan mendukung metode operasi data yang kaya.
  4. Antarmuka sumber data: Mendukung berbagai sumber data, seperti HDFS, S3, HBase, Cassandra, Hive, dll.
  5. mesin eksekusi: Ubah rencana kueri yang dioptimalkan menjadi tugas eksekusi dan jalankan tugas ini secara paralel pada klaster terdistribusi.

2. Fitur Spark SQL

  • Antarmuka akses data terpadu: Mendukung berbagai sumber data (seperti CSV, JSON, Parket, Hive, JDBC, HBase, dll.) dan menyediakan antarmuka kueri yang konsisten.
  • DataFrame dan API Kumpulan Data: Menyediakan antarmuka pemrograman berorientasi objek, mendukung operasi tipe aman, dan memfasilitasi pemrosesan data.
  • Pengoptimal katalis: Secara otomatis mengubah kueri pengguna menjadi rencana eksekusi yang efisien untuk meningkatkan kinerja kueri.
  • Integrasi dengan Hive: Mengintegrasikan Hive dengan mulus, dapat langsung mengakses data Hive yang ada, dan menggunakan UDF dan UDAF Hive.
  • kinerja tinggi: Mencapai kinerja kueri dan manajemen memori yang efisien melalui pengoptimal Catalyst dan mesin eksekusi Tungsten.
  • Berbagai metode operasi: Mendukung dua mode operasi: pemrograman SQL dan API, dengan fleksibilitas tinggi.
  • Antarmuka alat eksternal: Menyediakan antarmuka JDBC/ODBC untuk alat pihak ketiga untuk menggunakan Spark untuk pemrosesan data.
  • Antarmuka tingkat lanjut: Menyediakan antarmuka tingkat tinggi untuk memproses data dengan nyaman.

3. Prinsip operasi Spark SQL

Masukkan deskripsi gambar di sini

Penguraian Kueri: Mem-parsing kueri SQL menjadi pohon sintaksis abstrak (AST).

Pembuatan Rencana Logis: Ubah AST menjadi rencana logis yang tidak dioptimalkan.

Optimasi Rencana Logis: Gunakan pengoptimal Catalyst untuk mengoptimalkan serangkaian aturan pada rencana logis.

Pembuatan Rencana Fisik: Ubah rencana logis yang dioptimalkan menjadi satu atau lebih rencana fisik dan pilih rencana fisik yang optimal.

Eksekusi: Ubah rencana fisik menjadi RDD dan jalankan secara paralel di cluster.

4. Ikhtisar Spark SQL API

Konteks Spark : SparkContext adalah titik masuk utama aplikasi Spark dan bertanggung jawab untuk menghubungkan ke kluster Spark, mengelola sumber daya, dan penjadwalan tugas. Setelah Spark 2.0, disarankan untuk menggunakan SparkSession daripada SparkContext.

Konteks SQL : SQLContext adalah titik masuk pemrograman untuk Spark SQL, yang memungkinkan pengguna melakukan pemrosesan data melalui kueri SQL atau DataFrame API. Ini menyediakan fungsionalitas Spark SQL dasar.

Konteks Hive: HiveContext adalah bagian dari SQLContext, yang menambahkan dukungan terintegrasi untuk Hive. Hive dapat langsung mengakses data dan metadata di Hive, menggunakan UDF dan UDAF Hive.

Sesi Percikan : SparkSession adalah konsep baru yang diperkenalkan di Spark 2.0. Ini menggabungkan fungsi SQLContext dan HiveContext dan menyediakan antarmuka pemrograman terpadu. SparkSession adalah titik masuk yang direkomendasikan untuk Spark SQL, yang mendukung pemrosesan data menggunakan DataFrame dan Dataset API.

Catatan tentang pembuatan SparkContext dan SparkSession : Jika Anda perlu membuat SparkContext dan SparkSession secara bersamaan, Anda harus membuat SparkContext terlebih dahulu, lalu membuat SparkSession. Jika Anda membuat SparkSession terlebih dahulu lalu membuat SparkContext, pengecualian akan terjadi karena hanya satu SparkContext yang dapat berjalan di JVM yang sama.

5. Percikan ketergantungan SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Percikan Kumpulan Data SQL

Di Spark SQL, kumpulan data sebagian besar dibagi menjadi beberapa jenis berikut: DataFrame dan Kumpulan Data. Mereka adalah abstraksi inti untuk memproses dan memanipulasi data terstruktur dan semi-terstruktur.

1.Bingkai Data

Kumpulan data adalah struktur data abstrak baru yang diperkenalkan di Spark 2.0. Dataset ini diketik dengan kuat dan dapat menyimpan objek JVM. Dataset API menggabungkan kesederhanaan operasional dan keamanan tipe DataFrame dan cocok untuk skenario yang memerlukan tingkat kontrol tipe data yang lebih tinggi dan gaya pemrograman berorientasi objek. Fitur spesifiknya adalah sebagai berikut:

  • Mirip dengan tabel dua dimensi: DataFrame mirip dengan tabel dua dimensi dalam database relasional tradisional.
  • Skema (informasi struktur data): Skema ditambahkan berdasarkan RDD untuk menggambarkan informasi struktur data.
  • Mendukung tipe data bersarang: Skema DataFrame mendukung tipe data bersarang, seperti structmap Danarray
  • API operasi SQL yang kaya: Menyediakan lebih banyak API yang mirip dengan operasi SQL untuk memfasilitasi kueri dan pengoperasian data.

2、Kumpulan data

Kumpulan data adalah struktur data abstrak baru yang diperkenalkan di Spark 2.0. Dataset ini diketik dengan kuat dan dapat menyimpan objek JVM. Dataset API menggabungkan kesederhanaan operasional dan keamanan tipe DataFrame dan cocok untuk skenario yang memerlukan tingkat kontrol tipe data yang lebih tinggi dan gaya pemrograman berorientasi objek. Fitur spesifiknya adalah sebagai berikut:

  • Diketik dengan kuat: Pengumpulan data yang lebih umum yang diperkenalkan di Spark 1.6, Kumpulan Data diketik dengan kuat dan menyediakan operasi yang aman untuk tipe.
  • RDD + Skema: Dapat dianggap bahwa Dataset adalah kombinasi RDD dan Skema. Ia memiliki kemampuan komputasi terdistribusi RDD dan informasi Skema yang menjelaskan struktur data.
  • Berlaku untuk objek khusus domain: Kumpulan objek khusus domain yang diketik dengan kuat yang dapat disimpan dan dimanipulasi.
  • Operasi paralel: Konversi dan operasi dapat dilakukan secara paralel menggunakan fungsi atau operasi terkait.

3. Hubungan antara DataFrame dan Dataset

  • DataFrame adalah Kumpulan Data khusus: DataFrame adalah kasus khusus dari Dataset DataFrame = Dataset[Row]
  • Penyatuan abstraksi data dan metode operasi: DataFrame dan Dataset menyatukan abstraksi data dan metode operasi Spark SQL, memberikan kemampuan pemrosesan data yang fleksibel dan kuat.

7. Penggunaan dasar Spark Sql

1. Scala membuat objek SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Cara membuat DataFrame dan Dataset

1. Buat dari koleksi

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Baca dari sistem file

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Membaca dari database relasional

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Membaca dari sumber data tidak terstruktur

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Buat Kumpulan Data secara manual

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、API Bingkai Data

Contoh tata bahasa satu

Data simulasi (1000 item):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Persyaratan: Kombinasi kota dan gender mana yang memiliki rata-rata usia tertinggi dengan populasi lebih besar (jumlah ID &gt;50), dan peringkat kombinasi tersebut berdasarkan gender masing-masing.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

hasil:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Contoh sintaks dua: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Contoh sintaks tiga: gabung

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left kiri bergabung,rignt benar bergabung,full gabungan luar penuh,antiselisih kiri ditetapkan,semipersimpangan kiri

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

hasil

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43