Technology sharing

Scintilla SQL overview

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Scintilla SQL overview

scintilla SQL Modulus est Apache scintillae specie adhibita ad processum structum data. Potentes functiones SQL interrogationis et scintillae programmationis integrat, faciens processus magnas notitias efficaciores et faciliores. Per Spark SQL, utentes SQL queries directe in Scintilla uti possunt, vel DataFrame et DataSet APIs ad res datas uti.

1. scintilla architectura SQL

Architectura Scintillae SQL maxime consistit in his componentibus:

  1. SparkSession: Punctum unificatum pro scintilla applicationibus, DataFrames, DataSets creare solebat, et SQL quaesita exsequebantur.
  2. Sedibus optimizer: scintilla SQL quaesitum optimization engine, responsalis pro parsing, analyzing, optimizing et generans consilia corporis executionis.
  3. DataFrame et DataSet API: programmatio objectiva praebet rationes operandi ac subsidia dives notitias operandi.
  4. Data fons interface: fontes notitiae multiplices sustinet, ut HDFS, S3, HBase, Cassandra, Hive, etc.
  5. supplicium engine: Consilia interrogationis optimized converte ad operas execu- tiones et haec munera parallela in botro distributo exsequuntur.

2. scintilla SQL features

  • Una notitia accessum interface: fontes notitiae multiplices fovet (ut CSV, JSON, Parquet, Hive, JDBC, HBase, etc.) et constantem interrogationem interfaciem praebet.
  • DataFrame et Dataset API: programmationis objectum praebet interfaciem, subsidia operationes generis tutae, et processus notitias faciliores.
  • Sedibus optimizer: Automatarie interrogationes usoris convertendi in exsecutionem consilia efficientis ut perficiendi interrogationes meliores fiant.
  • Integration cum Alveare: Seamlessly integratur Hive, potest directe accessum datam Alveare existentem et uti Hive UDF et UDAF.
  • princeps perficientur: Quaesitum effectum et memoriam administrationis efficientis consequi per machinam catalyst optimizer et Wolframi exsecutionem consequi.
  • Varii modi operandi: Sustinet duos modos operationis: SQL et API programmatio, flexibilitatem altam.
  • Instrumentum externum interface: JDBC/ODBC instrumentorum interfaciem praebet pro tertia factione instrumentorum ut scintillae pro notitia processus utuntur.
  • Provectus interface: altiori gradu instrumenti ad processum notitiae commode praebet.

3. scintilla SQL principium operating

Insert imaginem descriptionis hic

Query Parsing: Parses SQL queries in syntaxi arborum abstractarum (AST).

Logica Plan Generatio:Converte AST consilium logicum inoptimized.

Logica Plan Optimization: Catalyst optimizer utere ad optimize seriem regularum in consilio logico.

Physica Plan Generatio: Convertere consilium logicum optimized in unam vel plura physica consilia et eligere meliorem corporis rationem.

Supplicium: consilium physicum converte in RDD et in botro fac parallelo.

4. Overview of Scintilla SQL API

SparkContext : SparkContext est principale viscus applicationis Scintillae et reus est connectendi cum botro scintilla, facultates administrandi et scheduling munus. Post Spark 2.0, commendatur ut SparkSession loco SparkContext utatur.

SQLContext : SQLContextus est programmandi punctum viscus pro Scintilla SQL, utentes utentes per SQL quaesita vel DataFrame API data processus conficere. Scintillam SQL functionem fundamentalem praebet.

HiveContext: HiveContextus est subset SQLContextum, quod subsidia integrata addit pro Hive. Data et metadata in Hive directe accedere potest, utendo Hive's UDF et UDAF.

SparkSession : Spark Session novus notio in Spark 2.0 introducitur SparkSession est commendatus viscus pro scintilla SQL, sustentans notitias processus utens DataFrame et Dataset APIs.

Notae in creando SparkContext et SparkSession : Si SparkContextum et SparkSessionem simul creare debes, primum SparkContext creare debes, et postea SparkSessionem creare. Si SparkSessionem primum creas et SparkContextum creabis, exceptio occurret quia una tantum SparkContext currere potest in eodem JVM.

5. Scintilla SQL clientelas

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Scintilla SQL Dataset

In Scintilla SQL, notitiae copiae maxime divisae sunt in genera sequentes: DataFrame et Dataset. Core sunt abstractiones ad processus et abusivas structas et semi-datas structas.

1、DataFrame

Dataset nova notitiarum abstractarum structura in Spark 2.0 introducta Dataset API simplicitas operativa et salus DataFrame coniungit et ad missiones apta est quae altiorem gradum imperii datae speciei et obiecti programmandi stilum ordinatum requirunt. Propriae lineamenta sunt haec:

  • Similia mensae duae dimensiva: DataFrame similis est mensae duarum dimensivarum in datorum relationum relativis traditis.
  • Schema (data compages notitia): Schema additur ex RDD ad informationem notitiarum structurarum describendam.
  • Support nested data types: DataFrame Schematis subsidia data genera nidificant, ut structmap etarray
  • Dives SQL operatio API: plus APIs similes operationibus SQL ut faciliorem datam interrogationem et operationem praebet.

2、Dataset

Dataset nova notitiarum abstractarum structura in Spark 2.0 introducta Dataset API simplicitas operativa et salus DataFrame coniungit et ad missiones apta est quae altiorem gradum imperii datae speciei et obiecti programmandi stilum ordinatum requirunt. Propriae lineamenta sunt haec:

  • Fortiter typed: Collectio magis generalis notitia in Spark 1.6 introducta, Dataset valde typus est et operationes typos tutas praebet.
  • RDD + Schema: Considerari potest quod Dataset compositum est ex RDD et Schemae.
  • Dicendum quod ad domain specialium obiecti: Collectio valde typed objectorum domain-specialium quae condi et tractari possunt.
  • Operatio parallel: Conversiones et operationes parallelis uti possunt functionibus seu operationibus affinibus.

3. Relationes inter DataFrame et Dataset

  • DataFrame est specialis Dataset: DataFrame casus specialis Dataset, id est DataFrame = Dataset[Row]
  • Unificationis notitia abstractionis et operandi modos: DataFrame et Dataset, abstractionem et operandi methodos scintillae SQL datas coniungunt, flexibiles et potentes datarum facultatum processus praebentes.

7. Vulgate usus scintillae Sql

1. Scala gignit SparkSession object

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Quomodo creare DataFrame et Dataset?

1. crea ex collectione

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Read ex tabella ratio

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Read ex relativis database

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Read ex unstructured notitia fontibus

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Manually creare Dataset

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、DataFrame API

Grammaticae exemplum unum

Data simulatio (1000 items);

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Necessitas: Quae civitas et genera complexionum habent summam aetatem mediocris cum maioribus populis (numerus IDs&gt;50), et ordo harum compositionum in suo quisque genere.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

eventum:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Syntaxis exempli duo: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Syntaxis exemplorum tria: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left sinistro coniunge;rignt ius iungere;full plena exteriora juncta;antisinistra differentia posuit;semisinistram sectionem

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

eventum

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43