Teknologian jakaminen

Spark SQL yleiskatsaus

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Spark SQL yleiskatsaus

Spark SQL Se on Apache Spark -moduuli, jota käytetään erityisesti strukturoidun tiedon käsittelemiseen. Se integroi tehokkaat SQL-kyselyn ja Spark-ohjelmoinnin toiminnot, mikä tekee isojen tietojen käsittelystä tehokkaampaa ja helpompaa. Spark SQL:n avulla käyttäjät voivat käyttää SQL-kyselyitä suoraan Sparkissa tai käyttää DataFrame- ja DataSet-sovellusliittymiä datatoimintoihin.

1. Spark SQL -arkkitehtuuri

Spark SQL:n arkkitehtuuri koostuu pääasiassa seuraavista komponenteista:

  1. SparkSession: Spark-sovellusten yhtenäinen aloituspiste, jota käytetään DataFrame-kehysten, tietojoukkojen luomiseen ja SQL-kyselyjen suorittamiseen.
  2. Katalysaattorin optimoija: Spark SQL:n kyselyn optimointimoottori, joka vastaa fyysisten suoritussuunnitelmien jäsentämisestä, analysoinnista, optimoinnista ja luomisesta.
  3. DataFrame ja DataSet API: Tarjoaa olio-ohjelmointirajapinnan ja tukee monipuolisia datatoimintoja.
  4. Tietolähteen käyttöliittymä: Tukee useita tietolähteitä, kuten HDFS, S3, HBase, Cassandra, Hive jne.
  5. suoritusmoottori: Muunna optimoidut kyselysuunnitelmat suoritustehtäviksi ja suorita nämä tehtävät rinnakkain hajautetussa klusterissa.

2. Spark SQL -ominaisuudet

  • Yhtenäinen tietojen käyttöliittymä: Tukee useita tietolähteitä (kuten CSV, JSON, Parquet, Hive, JDBC, HBase jne.) ja tarjoaa johdonmukaisen kyselyliittymän.
  • DataFrame ja Dataset API: Tarjoaa olio-ohjelmointirajapinnan, tukee tyyppiturvallisia toimintoja ja helpottaa tietojenkäsittelyä.
  • Katalysaattorin optimoija: Muunna käyttäjien kyselyt automaattisesti tehokkaiksi suoritussuunnitelmiksi kyselyn suorituskyvyn parantamiseksi.
  • Integrointi Hiven kanssa: Integroi saumattomasti Hiven, voi käyttää suoraan olemassa olevia Hiven tietoja ja käyttää Hiven UDF- ja UDAF-tiedostoja.
  • korkea suorituskyky: Saavuta tehokas kyselyn suorituskyky ja muistin hallinta Catalyst Optimerin ja Tungsten-suoritusmoottorin avulla.
  • Erilaisia ​​toimintatapoja: Tukee kahta toimintatilaa: SQL- ja API-ohjelmointi, erittäin joustavasti.
  • Ulkoinen työkaluliittymä: Tarjoaa JDBC/ODBC-liitännän kolmannen osapuolen työkaluille, jotka käyttävät Sparkia tietojenkäsittelyyn.
  • Kehittynyt käyttöliittymä: Tarjoaa korkeamman tason käyttöliittymän tietojen käsittelyyn kätevästi.

3. Spark SQL -toimintaperiaate

Lisää kuvan kuvaus tähän

Kyselyn jäsentäminen: Jäsentää SQL-kyselyt abstrakteihin syntaksipuihin (AST).

Loogisen suunnitelman luominen: Muunna AST optimoimattomaksi loogiseksi suunnitelmaksi.

Loogisen suunnitelman optimointi: Optimoi loogisen suunnitelman sääntösarja Catalyst-optimoijalla.

Fyysisen suunnitelman luominen: Muunna optimoitu looginen suunnitelma yhdeksi tai useammaksi fyysiseksi suunnitelmaksi ja valitse optimaalinen fyysinen suunnitelma.

Toteutus:Muunna fyysinen suunnitelma RDD:ksi ja suorita se rinnakkain klusterissa.

4. Spark SQL API:n yleiskatsaus

SparkContext : SparkContext on Spark-sovelluksen tärkein sisääntulopiste, ja se vastaa yhteyden muodostamisesta Spark-klusteriin, resurssien hallinnasta ja tehtävien ajoituksesta. Spark 2.0:n jälkeen on suositeltavaa käyttää SparkSessionia SparkContextin sijaan.

SQLContext : SQLContext on Spark SQL:n ohjelmoinnin aloituspiste, jonka avulla käyttäjät voivat käsitellä tietoja SQL-kyselyjen tai DataFrame API:n kautta. Se tarjoaa Spark SQL:n perustoiminnot.

HiveContext: HiveContext on SQLContextin osajoukko, joka lisää Hiven integroidun tuen. Se voi käyttää Hiven UDF:n ja UDAF:n avulla suoraan tietoja ja metatietoja.

SparkSession : SparkSession on Spark 2.0:ssa esitelty uusi konsepti, joka yhdistää SQLContextin ja HiveContextin toiminnot ja tarjoaa yhtenäisen ohjelmointiliittymän. SparkSession on suositeltu Spark SQL:n aloituspiste, joka tukee tietojenkäsittelyä DataFrame- ja Dataset API -sovellusliittymien avulla.

Huomautuksia SparkContextin ja SparkSessionin luomisesta : Jos sinun on luotava SparkContext ja SparkSession samanaikaisesti, sinun on ensin luotava SparkContext ja sitten SparkSession. Jos luot ensin SparkSessionin ja sitten SparkContextin, tapahtuu poikkeus, koska vain yksi SparkContext voi toimia samassa JVM:ssä.

5. Spark SQL-riippuvuudet

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Spark SQL Dataset

Spark SQL:ssä tietojoukot jaetaan pääasiassa seuraaviin tyyppeihin: DataFrame ja Dataset. Ne ovat keskeisiä abstraktioita strukturoidun ja puolistrukturoidun tiedon käsittelyyn ja käsittelyyn.

1、DataFrame

Tietojoukko on Spark 2.0:ssa käyttöön otettu uusi abstrakti tietorakenne. Se on vahvasti kirjoitettu ja voi tallentaa JVM-objekteja. Dataset API yhdistää DataFramen toiminnan yksinkertaisuuden ja tyyppiturvallisuuden ja soveltuu skenaarioihin, jotka vaativat korkeampaa tietotyyppien hallintaa ja olio-ohjelmointityyliä. Erityisominaisuudet ovat seuraavat:

  • Samanlainen kuin kaksiulotteinen pöytä: DataFrame on samanlainen kuin kaksiulotteinen taulukko perinteisessä relaatiotietokannassa.
  • Schema (tietorakennetiedot): Kaava on lisätty RDD:n perusteella kuvaamaan tietorakenteen tietoja.
  • Tukee sisäkkäisiä tietotyyppejä: DataFramen Schema tukee sisäkkäisiä tietotyyppejä, kuten structmap jaarray
  • Rikas SQL-käyttöliittymä: Tarjoaa enemmän SQL-toimintojen kaltaisia ​​API:ita tietojen kyselyn ja käytön helpottamiseksi.

2. Tietojoukko

Tietojoukko on Spark 2.0:ssa käyttöön otettu uusi abstrakti tietorakenne. Se on vahvasti kirjoitettu ja voi tallentaa JVM-objekteja. Dataset API yhdistää DataFramen toiminnan yksinkertaisuuden ja tyyppiturvallisuuden ja soveltuu skenaarioihin, jotka vaativat korkeampaa tietotyyppien hallintaa ja olio-ohjelmointityyliä. Erityisominaisuudet ovat seuraavat:

  • Vahvasti kirjoitettu: Spark 1.6:ssa käyttöön otettu yleisempi tiedonkeruu, Dataset on vahvasti kirjoitettu ja tarjoaa tyyppiturvallisia toimintoja.
  • RDD + Schema: Voidaan katsoa, ​​että Dataset on RDD:n ja Scheman yhdistelmä. Siinä on RDD:n hajautetut laskentaominaisuudet ja Scheman tietorakennetta kuvaavat tiedot.
  • Koskee toimialuekohtaisia ​​objekteja: Vahvasti kirjoitettu kokoelma toimialuekohtaisia ​​objekteja, joita voidaan tallentaa ja käsitellä.
  • Rinnakkaistoiminta: Muunnoksia ja operaatioita voidaan suorittaa rinnakkain funktioiden tai niihin liittyvien toimintojen avulla.

3. DataFramen ja Datasetin välinen suhde

  • DataFrame on erityinen tietojoukko: DataFrame on Datasetin erikoistapaus DataFrame = Dataset[Row]
  • Tietojen abstraktion ja käyttömenetelmien yhtenäistäminen: DataFrame ja Dataset yhdistävät Spark SQL:n tiedon abstraktio- ja toimintatavat tarjoamalla joustavia ja tehokkaita tietojenkäsittelyominaisuuksia.

7. Spark Sql:n peruskäyttö

1. Scala luo SparkSession-objektin

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. DataFramen ja Datasetin luominen

1. Luo kokoelmasta

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Lue tiedostojärjestelmästä

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Lue relaatiotietokannasta

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Lue jäsentämättömistä tietolähteistä

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Luo tietojoukko manuaalisesti

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、DataFrame API

Kieliopin esimerkki yksi

Simulaatiotiedot (1000 kohdetta):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Vaatimus: Millä kaupunki- ja sukupuoliyhdistelmillä on korkein keski-ikä ja suurempi väestö (tunnusten määrä &gt; 50), ja näiden yhdistelmien sijoittuminen vastaaviin sukupuoliin.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

tulos:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Syntaksiesimerkki kaksi: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Syntaksiesimerkki kolme: liity

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left vasemmalle liittyä,rignt oikein liity,full täysi ulkoinen liitos,antivasen ero asetettu,semivasen risteys

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

tulos

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43