Compartilhamento de tecnologia

Visão geral do Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Visão geral do Spark SQL

Spark SQL É um módulo do Apache Spark usado especificamente para processar dados estruturados. Ele integra funções poderosas de consulta SQL e programação Spark, tornando o processamento de big data mais eficiente e fácil. Por meio do Spark SQL, os usuários podem usar consultas SQL diretamente no Spark ou usar as APIs DataFrame e DataSet para operações de dados.

1. Arquitetura Spark SQL

A arquitetura do Spark SQL consiste principalmente nos seguintes componentes:

  1. Sessão Spark: um ponto de entrada unificado para aplicativos Spark, usado para criar DataFrames, DataSets e executar consultas SQL.
  2. Otimizador de catalisador: mecanismo de otimização de consultas do Spark SQL, responsável por analisar, analisar, otimizar e gerar planos de execução física.
  3. API DataFrame e DataSet: Fornece interface de programação orientada a objetos e oferece suporte a métodos ricos de operação de dados.
  4. Interface de fonte de dados: Suporta múltiplas fontes de dados, como HDFS, S3, HBase, Cassandra, Hive, etc.
  5. mecanismo de execução: converta planos de consulta otimizados em tarefas de execução e execute essas tarefas em paralelo em um cluster distribuído.

2. Recursos do Spark SQL

  • Interface unificada de acesso a dados: oferece suporte a várias fontes de dados (como CSV, JSON, Parquet, Hive, JDBC, HBase, etc.) e fornece uma interface de consulta consistente.
  • API DataFrame e conjunto de dados: fornece uma interface de programação orientada a objetos, oferece suporte a operações com segurança de tipo e facilita o processamento de dados.
  • Otimizador de catalisador: converta automaticamente as consultas do usuário em planos de execução eficientes para melhorar o desempenho da consulta.
  • Integração com Hive: integra perfeitamente o Hive, pode acessar diretamente os dados existentes do Hive e usar UDF e UDAF do Hive.
  • alta performance: obtenha desempenho de consulta e gerenciamento de memória eficientes por meio do otimizador Catalyst e do mecanismo de execução Tungsten.
  • Vários métodos de operação: Suporta dois modos de operação: programação SQL e API, com alta flexibilidade.
  • Interface de ferramenta externa: fornece interface JDBC/ODBC para ferramentas de terceiros usarem o Spark para processamento de dados.
  • Interface avançada: Fornece uma interface de nível superior para processar dados de maneira conveniente.

3. Princípio de funcionamento do Spark SQL

Insira a descrição da imagem aqui

Análise de consulta: analisa consultas SQL em árvores de sintaxe abstratas (AST).

Geração de Plano Lógico:Converta AST em plano lógico não otimizado.

Otimização do Plano Lógico: use o otimizador Catalyst para otimizar uma série de regras no plano lógico.

Geração de Plano Físico: converta o plano lógico otimizado em um ou mais planos físicos e selecione o plano físico ideal.

Execução: Converta o plano físico em RDD e execute-o em paralelo no cluster.

4. Visão geral da API Spark SQL

Contexto Spark : SparkContext é o principal ponto de entrada da aplicação Spark e é responsável pela conexão ao cluster Spark, gerenciando recursos e agendamento de tarefas. Após o Spark 2.0, é recomendado usar SparkSession em vez de SparkContext.

SQLContext : SQLContext é o ponto de entrada de programação para Spark SQL, permitindo aos usuários realizar processamento de dados por meio de consultas SQL ou API DataFrame. Ele fornece funcionalidade básica do Spark SQL.

Contexto da Colmeia: HiveContext é um subconjunto de SQLContext, que adiciona suporte integrado para Hive. Ele pode acessar dados e metadados diretamente no Hive, usando UDF e UDAF do Hive.

Sessão Spark : SparkSession é um novo conceito introduzido no Spark 2.0. Ele mescla as funções de SQLContext e HiveContext e fornece uma interface de programação unificada. SparkSession é o ponto de entrada recomendado para Spark SQL, suportando processamento de dados usando as APIs DataFrame e Dataset.

Notas sobre a criação de SparkContext e SparkSession : se você precisar criar SparkContext e SparkSession ao mesmo tempo, deverá primeiro criar SparkContext e, em seguida, criar SparkSession. Se você criar SparkSession primeiro e depois criar SparkContext, ocorrerá uma exceção porque apenas um SparkContext pode ser executado na mesma JVM.

5. Dependências do Spark SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Conjunto de dados Spark SQL

No Spark SQL, os conjuntos de dados são divididos principalmente nos seguintes tipos: DataFrame e Dataset. São abstrações básicas para processamento e manipulação de dados estruturados e semiestruturados.

1、Quadro de dados

Dataset é uma nova estrutura de dados abstrata introduzida no Spark 2.0. Ela é fortemente tipada e pode armazenar objetos JVM. A API Dataset combina a simplicidade operacional e a segurança de tipo do DataFrame e é adequada para cenários que exigem um nível mais alto de controle de tipo de dados e estilo de programação orientado a objetos. Os recursos específicos são os seguintes:

  • Semelhante a uma tabela bidimensional: DataFrame é semelhante a uma tabela bidimensional em um banco de dados relacional tradicional.
  • Esquema (informações da estrutura de dados): O esquema é adicionado com base no RDD para descrever as informações da estrutura de dados.
  • Suporta tipos de dados aninhados: o esquema do DataFrame suporta tipos de dados aninhados, como structmap earray
  • API de operação SQL avançada: fornece mais APIs semelhantes às operações SQL para facilitar a consulta e operação de dados.

2、Conjunto de dados

Dataset é uma nova estrutura de dados abstrata introduzida no Spark 2.0. Ela é fortemente tipada e pode armazenar objetos JVM. A API Dataset combina a simplicidade operacional e a segurança de tipo do DataFrame e é adequada para cenários que exigem um nível mais alto de controle de tipo de dados e estilo de programação orientado a objetos. Os recursos específicos são os seguintes:

  • Fortemente digitado: uma coleta de dados mais geral introduzida no Spark 1.6, o Dataset é fortemente tipado e fornece operações com segurança de tipo.
  • RDD + Esquema: Pode-se considerar que o Dataset é uma combinação de RDD e Schema. Possui a capacidade de computação distribuída do RDD e as informações do Schema que descrevem a estrutura de dados.
  • Aplica-se a objetos específicos de domínio: uma coleção fortemente tipada de objetos específicos de domínio que podem ser armazenados e manipulados.
  • Operação paralela: Conversões e operações podem ser realizadas em paralelo usando funções ou operações relacionadas.

3. A relação entre DataFrame e Dataset

  • DataFrame é um conjunto de dados especial: DataFrame é um caso especial de Dataset, ou seja DataFrame = Dataset[Row]
  • Unificação de abstração de dados e métodos de operação: DataFrame e Dataset unificam os métodos de abstração e operação de dados do Spark SQL, fornecendo recursos de processamento de dados flexíveis e poderosos.

7. Uso básico do Spark SQL

1. Scala cria objeto SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Como criar DataFrame e Dataset

1. Crie a partir de uma coleção

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Leia do sistema de arquivos

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Leia do banco de dados relacional

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Leia fontes de dados não estruturadas

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Crie manualmente o conjunto de dados

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、API DataFrame

Exemplo de gramática um

Dados de simulação (1000 itens):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Requisito: Quais combinações de cidade e gênero têm a idade média mais alta com populações maiores (número de IDs &gt;50) e a classificação dessas combinações dentro de seus respectivos gêneros.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

resultado:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Exemplo de sintaxe dois: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Exemplo de sintaxe três: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left Associação à esquerda,rignt junte-se à direita,full junção externa completa,anticonjunto de diferenças à esquerda,semiintersecção à esquerda

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

resultado

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43