Обмен технологиями

Обзор Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Обзор Spark SQL

Искра SQL Это модуль Apache Spark, специально используемый для обработки структурированных данных. Он объединяет мощные функции SQL-запросов и программирования Spark, что делает обработку больших данных более эффективной и простой. С помощью Spark SQL пользователи могут использовать SQL-запросы непосредственно в Spark или использовать API-интерфейсы DataFrame и DataSet для операций с данными.

1. Архитектура Spark SQL

Архитектура Spark SQL в основном состоит из следующих компонентов:

  1. SparkSession: унифицированная точка входа для приложений Spark, используемая для создания DataFrames, DataSets и выполнения SQL-запросов.
  2. Оптимизатор катализатора: механизм оптимизации запросов Spark SQL, отвечающий за синтаксический анализ, анализ, оптимизацию и создание планов физического выполнения.
  3. API DataFrame и DataSet: Обеспечивает объектно-ориентированный интерфейс программирования и поддерживает разнообразные методы работы с данными.
  4. Интерфейс источника данных: поддерживает несколько источников данных, таких как HDFS, S3, HBase, Cassandra, Hive и т. д.
  5. механизм исполнения: Преобразуйте оптимизированные планы запросов в задачи выполнения и выполняйте эти задачи параллельно в распределенном кластере.

2. Функции Spark SQL

  • Единый интерфейс доступа к данным: поддерживает несколько источников данных (таких как CSV, JSON, Parquet, Hive, JDBC, HBase и т. д.) и обеспечивает согласованный интерфейс запросов.
  • DataFrame и API набора данных: предоставляет объектно-ориентированный интерфейс программирования, поддерживает типобезопасные операции и облегчает обработку данных.
  • Оптимизатор катализатора: автоматическое преобразование пользовательских запросов в эффективные планы выполнения для повышения производительности запросов.
  • Интеграция с Hive: Полная интеграция Hive, возможность прямого доступа к существующим данным Hive и использование UDF и UDAF Hive.
  • высокая производительность: Обеспечьте эффективную производительность запросов и управление памятью с помощью оптимизатора Catalyst и механизма выполнения Tungsten.
  • Различные методы работы: поддерживает два режима работы: программирование SQL и API, с высокой гибкостью.
  • Внешний интерфейс инструмента: предоставляет интерфейс JDBC/ODBC для сторонних инструментов, позволяющих использовать Spark для обработки данных.
  • Расширенный интерфейс: Обеспечивает интерфейс более высокого уровня для удобной обработки данных.

3. Принцип работы Spark SQL

Вставьте сюда описание изображения

Анализ запросов: анализирует SQL-запросы в абстрактные синтаксические деревья (AST).

Генерация логического плана:Преобразовать AST в неоптимизированный логический план.

Логическая оптимизация плана: используйте оптимизатор Catalyst для оптимизации серии правил на логическом плане.

Генерация физического плана: преобразовать оптимизированный логический план в один или несколько физических планов и выбрать оптимальный физический план.

Исполнение: преобразовать физический план в RDD и выполнить его параллельно в кластере.

4. Обзор API Spark SQL

SparkContext : SparkContext — это основная точка входа приложения Spark, отвечающая за подключение к кластеру Spark, управление ресурсами и планирование задач. После Spark 2.0 рекомендуется использовать SparkSession вместо SparkContext.

SQLContext : SQLContext — это точка входа в программирование для Spark SQL, позволяющая пользователям выполнять обработку данных с помощью запросов SQL или API DataFrame. Он обеспечивает базовую функциональность Spark SQL.

HiveContext: HiveContext — это подмножество SQLContext, которое добавляет интегрированную поддержку Hive. Он может напрямую обращаться к данным и метаданным в Hive, используя UDF и UDAF Hive.

SparkSession : SparkSession — это новая концепция, представленная в Spark 2.0. Она объединяет функции SQLContext и HiveContext и предоставляет унифицированный интерфейс программирования. SparkSession — рекомендуемая точка входа для Spark SQL, поддерживающая обработку данных с помощью API DataFrame и Dataset.

Примечания по созданию SparkContext и SparkSession. : Если вам нужно создать SparkContext и SparkSession одновременно, сначала необходимо создать SparkContext, а затем создать SparkSession. Если вы сначала создадите SparkSession, а затем создадите SparkContext, возникнет исключение, поскольку в одной JVM может работать только один SparkContext.

5. Зависимости Spark SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Набор данных Spark SQL

В Spark SQL наборы данных в основном делятся на следующие типы: DataFrame и Dataset. Это основные абстракции для обработки и манипулирования структурированными и полуструктурированными данными.

1. Кадр данных

Набор данных — это новая абстрактная структура данных, представленная в Spark 2.0. Она строго типизирована и может хранить объекты JVM. API набора данных сочетает в себе простоту эксплуатации и безопасность типов DataFrame и подходит для сценариев, требующих более высокого уровня управления типами данных и объектно-ориентированного стиля программирования. Специфические особенности заключаются в следующем:

  • Похоже на двумерную таблицу: DataFrame похож на двумерную таблицу в традиционной реляционной базе данных.
  • Схема (информация о структуре данных): Добавлена ​​схема на основе RDD для описания информации структуры данных.
  • Поддержка вложенных типов данных: Схема DataFrame поддерживает вложенные типы данных, такие как structmap иarray
  • Богатый API операций SQL: предоставляет больше API-интерфейсов, аналогичных операциям SQL, для облегчения запроса данных и выполнения операций.

2. Набор данных

Набор данных — это новая абстрактная структура данных, представленная в Spark 2.0. Она строго типизирована и может хранить объекты JVM. API набора данных сочетает в себе простоту эксплуатации и безопасность типов DataFrame и подходит для сценариев, требующих более высокого уровня управления типами данных и объектно-ориентированного стиля программирования. Специфические особенности заключаются в следующем:

  • Строго типизированный: более общий сбор данных, представленный в Spark 1.6. Dataset строго типизирован и обеспечивает типобезопасные операции.
  • RDD + Схема: Можно считать, что набор данных представляет собой комбинацию RDD и схемы. Он обладает как возможностями распределенных вычислений RDD, так и информацией схемы, описывающей структуру данных.
  • Применяется к объектам, специфичным для предметной области.: строго типизированная коллекция объектов, специфичных для предметной области, которые можно хранить и манипулировать ими.
  • Параллельная работа: Преобразования и операции могут выполняться параллельно с использованием функций или связанных операций.

3. Связь между DataFrame и Dataset.

  • DataFrame — это специальный набор данных.: DataFrame — это частный случай Dataset, т.е. DataFrame = Dataset[Row]
  • Унификация методов абстракции данных и работы с ними: DataFrame и Dataset объединяют абстракцию данных и методы работы Spark SQL, обеспечивая гибкие и мощные возможности обработки данных.

7. Базовое использование Spark Sql

1. Scala создает объект SparkSession.

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Как создать DataFrame и Dataset

1. Создать из коллекции

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Чтение из файловой системы

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Чтение из реляционной базы данных

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Чтение из неструктурированных источников данных

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Создайте набор данных вручную.

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. API DataFrame

Грамматический пример первый

Данные моделирования (1000 элементов):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Требование: какие комбинации городов и полов имеют самый высокий средний возраст при большей численности населения (количество идентификаторов &gt; 50), а также ранжирование этих комбинаций внутри соответствующих полов.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

результат:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Пример синтаксиса второй: представление, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Пример синтаксиса третий: присоединиться

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left осталось присоединиться,rignt правильно присоединяйся,full полное внешнее соединение,antiнабор левой разницы,semiлевый перекресток

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

результат

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43