技術共有

Spark SQL の概要

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Spark SQL の概要

スパークSQL これは、構造化データの処理に特に使用される Apache Spark のモジュールです。 SQL クエリと Spark プログラミングの強力な機能を統合し、ビッグ データの処理をより効率的かつ簡単にします。 Spark SQL を介して、ユーザーは Spark で直接 SQL クエリを使用したり、データ操作に DataFrame および DataSet API を使用したりできます。

1.Spark SQL アーキテクチャ

Spark SQL のアーキテクチャは主に次のコンポーネントで構成されます。

  1. スパークセッション: Spark アプリケーションの統合エントリ ポイント。DataFrame、DataSet の作成、SQL クエリの実行に使用されます。
  2. 触媒オプティマイザー: Spark SQL のクエリ最適化エンジン。物理的な実行プランの解析、分析、最適化、生成を担当します。
  3. DataFrame と DataSet API: オブジェクト指向プログラミング インターフェイスを提供し、豊富なデータ操作メソッドをサポートします。
  4. データソースインターフェース: HDFS、S3、HBase、Cassandra、Hive などの複数のデータ ソースをサポートします。
  5. 実行エンジン: 最適化されたクエリ プランを実行タスクに変換し、分散クラスター上でこれらのタスクを並列実行します。

2. Spark SQL の機能

  • 統合されたデータアクセスインターフェイス: 複数のデータ ソース (CSV、JSON、Parquet、Hive、JDBC、HBase など) をサポートし、一貫したクエリ インターフェイスを提供します。
  • データフレームとデータセット API: オブジェクト指向プログラミング インターフェイスを提供し、タイプ セーフな操作をサポートし、データ処理を容易にします。
  • 触媒オプティマイザー: ユーザー クエリを効率的な実行プランに自動的に変換して、クエリのパフォーマンスを向上させます。
  • ハイブとの統合: Hive をシームレスに統合し、既存の Hive データに直接アクセスし、Hive の UDF および UDAF を使用できます。
  • ハイパフォーマンス: Catalyst オプティマイザーと Tungsten 実行エンジンを通じて、効率的なクエリ パフォーマンスとメモリ管理を実現します。
  • さまざまな操作方法: SQL プログラミングと API プログラミングの 2 つの動作モードをサポートし、高い柔軟性を備えています。
  • 外部ツールインターフェース: データ処理に Spark を使用するサードパーティ ツールに JDBC/ODBC インターフェイスを提供します。
  • 高度なインターフェース: データを便利に処理するための上位レベルのインターフェイスを提供します。

3. Spark SQL の動作原理

ここに画像の説明を挿入します

クエリ解析: SQL クエリを抽象構文ツリー (AST) に解析します。

論理計画の生成:AST を最適化されていない論理プランに変換します。

論理計画の最適化: Catalyst オプティマイザーを使用して、論理プランの一連のルールを最適化します。

物理計画の生成: 最適化された論理プランを 1 つ以上の物理プランに変換し、最適な物理プランを選択します。

実行: 物理プランを RDD に変換し、クラスター上で並列実行します。

4. Spark SQL APIの概要

スパークコンテキスト : SparkContext は、Spark アプリケーションのメイン エントリ ポイントであり、Spark クラスターへの接続、リソースの管理、およびタスクのスケジューリングを担当します。 Spark 2.0 以降では、SparkContext の代わりに SparkSession を使用することをお勧めします。

SQLコンテキスト : SQLContext は Spark SQL のプログラミング エントリ ポイントであり、ユーザーが SQL クエリまたは DataFrame API を通じてデータ処理を実行できるようにします。基本的な Spark SQL 機能を提供します。

ハイブコンテキスト: HiveContext は SQLContext のサブセットであり、Hive の統合サポートを追加し、Hive の UDF と UDAF を使用して Hive 内のデータとメタデータに直接アクセスできます。

スパークセッション : SparkSession は、Spark 2.0 で導入された新しい概念であり、SQLContext と HiveContext の機能を統合し、統一されたプログラミング インターフェイスを提供します。 SparkSession は Spark SQL の推奨エントリ ポイントであり、DataFrame および Dataset API を使用したデータ処理をサポートします。

SparkContextとSparkSessionの作成に関する注意事項注: SparkContext と SparkSession を同時に作成する必要がある場合は、最初に SparkContext を作成してから、SparkSession を作成する必要があります。最初に SparkSession を作成してから SparkContext を作成すると、同じ JVM 内で実行できる SparkContext は 1 つだけであるため、例外が発生します。

5. Spark SQL の依存関係

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Spark SQL データセット

Spark SQL では、データセットは主に DataFrame と Dataset のタイプに分類されます。これらは、構造化データおよび半構造化データを処理および操作するための中核となる抽象化です。

1、データフレーム

データセットは、Spark 2.0 で導入された新しい抽象データ構造であり、厳密に型指定されており、JVM オブジェクトを格納できます。 Dataset API は、DataFrame の操作の単純さとタイプ セーフティを組み合わせており、より高いレベルのデータ型制御とオブジェクト指向プログラミング スタイルを必要とするシナリオに適しています。具体的な機能は次のとおりです。

  • 2次元のテーブルに似ています: DataFrame は、従来のリレーショナル データベースの 2 次元テーブルに似ています。
  • スキーマ(データ構造情報): RDDをベースにスキーマを追加し、データ構造の情報を記述します。
  • ネストされたデータ型をサポートする: DataFrame のスキーマは、次のようなネストされたデータ型をサポートします。 structmap そしてarray
  • 豊富なSQL操作API: データのクエリと操作を容易にする SQL 操作に似た API をさらに提供します。

2、データセット

データセットは、Spark 2.0 で導入された新しい抽象データ構造であり、厳密に型指定されており、JVM オブジェクトを格納できます。 Dataset API は、DataFrame の操作の単純さとタイプ セーフティを組み合わせており、より高いレベルのデータ型制御とオブジェクト指向プログラミング スタイルを必要とするシナリオに適しています。具体的な機能は次のとおりです。

  • 厳密に型指定された: Spark 1.6 で導入されたより一般的なデータ コレクションである Dataset は、厳密に型指定されており、タイプセーフな操作を提供します。
  • RDD + スキーマ: Dataset は、RDD と Schema を組み合わせたものと考えることができます。RDD の分散コンピューティング機能と、データ構造を記述する Schema の情報の両方を備えています。
  • ドメイン固有のオブジェクトに適用されます: 保存および操作できる、厳密に型指定されたドメイン固有のオブジェクトのコレクション。
  • 並列運転: 関数または関連する演算を使用して、変換と演算を並行して実行できます。

3. データフレームとデータセットの関係

  • DataFrame は特別なデータセットです: DataFrame は Dataset の特殊なケースです。 DataFrame = Dataset[Row]
  • データの抽象化と操作方法の統一: DataFrame と Dataset は、Spark SQL のデータ抽象化と操作メソッドを統合し、柔軟で強力なデータ処理機能を提供します。

7. Spark SQLの基本的な使い方

1. Scala は SparkSession オブジェクトを作成します

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. データフレームとデータセットの作成方法

1. コレクションから作成する

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. ファイルシステムから読み取る

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. リレーショナル データベースからの読み取り

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. 非構造化データソースからの読み取り

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. データセットを手動で作成する

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、データフレームAPI

文法例 1

シミュレーションデータ(1000件):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

要件: 人口が多く(ID 数 &gt; 50)、平均年齢が最も高い都市と性別の組み合わせ、およびそれぞれの性別内でのこれらの組み合わせのランキング。

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

結果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

構文例 2: view、sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

構文例 3: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left 左結合、rignt 右結合、full 完全外部結合、anti左差分セット、semi左交差点

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

結果

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43