Compartir tecnología

Descripción general de SparkSQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Descripción general de SparkSQL

Spark SQL Es un módulo de Apache Spark utilizado específicamente para procesar datos estructurados. Integra las potentes funciones de consulta SQL y programación Spark, lo que hace que el procesamiento de big data sea más eficiente y sencillo. A través de Spark SQL, los usuarios pueden usar consultas SQL directamente en Spark o usar las API DataFrame y DataSet para operaciones de datos.

1. Arquitectura Spark SQL

La arquitectura de Spark SQL consta principalmente de los siguientes componentes:

  1. Sesión de chispa: Un punto de entrada unificado para aplicaciones Spark, utilizado para crear DataFrames, DataSets y ejecutar consultas SQL.
  2. Optimizador de catalizador: Motor de optimización de consultas de Spark SQL, responsable de analizar, analizar, optimizar y generar planes de ejecución física.
  3. API de marco de datos y conjunto de datos: Proporciona una interfaz de programación orientada a objetos y admite métodos de operación de datos enriquecidos.
  4. Interfaz de fuente de datos: Admite múltiples fuentes de datos, como HDFS, S3, HBase, Cassandra, Hive, etc.
  5. motor de ejecución: Convierta planes de consulta optimizados en tareas de ejecución y ejecute estas tareas en paralelo en un clúster distribuido.

2. Funciones de Spark SQL

  • Interfaz de acceso a datos unificada: Admite múltiples fuentes de datos (como CSV, JSON, Parquet, Hive, JDBC, HBase, etc.) y proporciona una interfaz de consulta consistente.
  • API de marco de datos y conjunto de datos: Proporciona una interfaz de programación orientada a objetos, admite operaciones con seguridad de tipos y facilita el procesamiento de datos.
  • Optimizador de catalizador: convierta automáticamente las consultas de los usuarios en planes de ejecución eficientes para mejorar el rendimiento de las consultas.
  • Integración con colmena: Integra perfectamente Hive, puede acceder directamente a los datos existentes de Hive y utilizar UDF y UDAF de Hive.
  • alto rendimiento: Logre un rendimiento de consultas y una gestión de memoria eficientes a través del optimizador Catalyst y el motor de ejecución Tungsten.
  • Varios métodos de operación: Admite dos modos de operación: programación SQL y API, con alta flexibilidad.
  • Interfaz de herramienta externa: Proporciona una interfaz JDBC/ODBC para que herramientas de terceros utilicen Spark para el procesamiento de datos.
  • Interfaz avanzada: Proporciona una interfaz de nivel superior para procesar datos cómodamente.

3. Principio operativo de Spark SQL

Insertar descripción de la imagen aquí

Análisis de consultas: analiza consultas SQL en árboles de sintaxis abstracta (AST).

Generación de planes lógicos:Convierta AST a un plan lógico no optimizado.

Optimización del plan lógico: Utilice el optimizador Catalyst para optimizar una serie de reglas en el plan lógico.

Generación de planos físicos: convierta el plan lógico optimizado en uno o más planes físicos y seleccione el plan físico óptimo.

Ejecución: Convierta el plan físico en RDD y ejecútelo en paralelo en el clúster.

4. Descripción general de la API Spark SQL

contexto de chispa : SparkContext es el punto de entrada principal de la aplicación Spark y es responsable de conectarse al clúster Spark, administrar recursos y programar tareas. Después de Spark 2.0, se recomienda utilizar SparkSession en lugar de SparkContext.

contexto sql : SQLContext es el punto de entrada de programación para Spark SQL, que permite a los usuarios realizar procesamiento de datos a través de consultas SQL o API DataFrame. Proporciona funcionalidad básica de Spark SQL.

Contexto de colmena: HiveContext es un subconjunto de SQLContext, que agrega soporte integrado para Hive. Puede acceder directamente a datos y metadatos en Hive, utilizando UDF y UDAF de Hive.

Sesión de chispa : SparkSession es un nuevo concepto introducido en Spark 2.0. Combina las funciones de SQLContext y HiveContext y proporciona una interfaz de programación unificada. SparkSession es el punto de entrada recomendado para Spark SQL y admite el procesamiento de datos mediante las API DataFrame y Dataset.

Notas sobre la creación de SparkContext y SparkSession : Si necesita crear SparkContext y SparkSession al mismo tiempo, primero debe crear SparkContext y luego crear SparkSession. Si crea SparkSession primero y luego crea SparkContext, se producirá una excepción porque solo se puede ejecutar un SparkContext en la misma JVM.

5. Dependencias de Spark SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Conjunto de datos Spark SQL

En Spark SQL, los conjuntos de datos se dividen principalmente en los siguientes tipos: DataFrame y Dataset. Son abstracciones centrales para procesar y manipular datos estructurados y semiestructurados.

1、Marco de datos

El conjunto de datos es una nueva estructura de datos abstracta introducida en Spark 2.0. Está fuertemente tipada y puede almacenar objetos JVM. La API Dataset combina la simplicidad operativa y la seguridad de tipos de DataFrame y es adecuada para escenarios que requieren un mayor nivel de control de tipos de datos y un estilo de programación orientado a objetos. Las características específicas son las siguientes:

  • Similar a una mesa bidimensional: DataFrame es similar a una tabla bidimensional en una base de datos relacional tradicional.
  • Esquema (información de estructura de datos): El esquema se agrega sobre la base de RDD para describir la información de la estructura de datos.
  • Admite tipos de datos anidados: El esquema de DataFrame admite tipos de datos anidados, como structmap yarray
  • API de operación SQL enriquecida: Proporciona más API similares a las operaciones SQL para facilitar la consulta y operación de datos.

2、Conjunto de datos

El conjunto de datos es una nueva estructura de datos abstracta introducida en Spark 2.0. Está fuertemente tipada y puede almacenar objetos JVM. La API Dataset combina la simplicidad operativa y la seguridad de tipos de DataFrame y es adecuada para escenarios que requieren un mayor nivel de control de tipos de datos y un estilo de programación orientado a objetos. Las características específicas son las siguientes:

  • fuertemente tipado: Dataset, una recopilación de datos más general introducida en Spark 1.6, está fuertemente tipada y proporciona operaciones con seguridad de tipos.
  • RDD + Esquema: Se puede considerar que el conjunto de datos es una combinación de RDD y Schema. Tiene tanto la capacidad informática distribuida de RDD como la información de Schema que describe la estructura de datos.
  • Se aplica a objetos específicos del dominio.: una colección fuertemente tipada de objetos específicos de un dominio que se pueden almacenar y manipular.
  • Operación paralela: Las conversiones y operaciones se pueden realizar en paralelo utilizando funciones u operaciones relacionadas.

3. La relación entre DataFrame y Dataset

  • DataFrame es un conjunto de datos especial: DataFrame es un caso especial de Dataset, es decir DataFrame = Dataset[Row]
  • Unificación de métodos de operación y abstracción de datos.: DataFrame y Dataset unifican los métodos de operación y abstracción de datos de Spark SQL, proporcionando capacidades de procesamiento de datos potentes y flexibles.

7. Uso básico de Spark Sql

1. Scala crea el objeto SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Cómo crear DataFrame y Dataset

1. Crear a partir de una colección

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Leer desde el sistema de archivos.

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Leer desde una base de datos relacional

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Leer de fuentes de datos no estructurados

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Crear conjunto de datos manualmente

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. API de DataFrame

Ejemplo uno de gramática

Datos de simulación (1000 artículos):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Requisito: qué combinaciones de ciudad y género tienen la edad promedio más alta con poblaciones más grandes (número de identificaciones &gt;50) y la clasificación de estas combinaciones dentro de sus respectivos géneros.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

resultado:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Ejemplo de sintaxis dos: ver, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Ejemplo de sintaxis tres: unirse

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left unirse a la izquierda,rignt unirse a la derecha,full unión exterior completa,anticonjunto de diferencia izquierda,semiintersección izquierda

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

resultado

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43