기술나눔

스파크 SQL 개요

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

스파크 SQL 개요

스파크 SQL 구조화된 데이터를 처리하는 데 특별히 사용되는 Apache Spark의 모듈입니다. SQL 쿼리와 Spark 프로그래밍의 강력한 기능을 통합하여 빅 데이터를 보다 효율적이고 쉽게 처리할 수 있습니다. Spark SQL을 통해 사용자는 Spark에서 직접 SQL 쿼리를 사용하거나 데이터 작업에 DataFrame 및 DataSet API를 사용할 수 있습니다.

1. 스파크 SQL 아키텍처

Spark SQL의 아키텍처는 주로 다음 구성 요소로 구성됩니다.

  1. 스파크세션: DataFrames, DataSets를 생성하고 SQL 쿼리를 실행하는 데 사용되는 Spark 애플리케이션의 통합 진입점입니다.
  2. 촉매 최적화기: 물리적 실행 계획의 구문 분석, 분석, 최적화 및 생성을 담당하는 Spark SQL의 쿼리 최적화 엔진입니다.
  3. DataFrame 및 DataSet API: 객체지향 프로그래밍 인터페이스를 제공하고 풍부한 데이터 연산 방식을 지원합니다.
  4. 데이터 소스 인터페이스: HDFS, S3, HBase, Cassandra, Hive 등과 같은 여러 데이터 소스를 지원합니다.
  5. 실행 엔진: 최적화된 쿼리 계획을 실행 작업으로 변환하고 이러한 작업을 분산 클러스터에서 병렬로 실행합니다.

2. 스파크 SQL 기능

  • 통합 데이터 액세스 인터페이스: 여러 데이터 소스(예: CSV, JSON, Parquet, Hive, JDBC, HBase 등)를 지원하고 일관된 쿼리 인터페이스를 제공합니다.
  • DataFrame 및 데이터세트 API: 객체 지향 프로그래밍 인터페이스를 제공하고 유형이 안전한 작업을 지원하며 데이터 처리를 용이하게 합니다.
  • 촉매 최적화기: 사용자 쿼리를 효율적인 실행 계획으로 자동 변환하여 쿼리 성능을 향상시킵니다.
  • 하이브와 통합: Hive를 원활하게 통합하고 기존 Hive 데이터에 직접 액세스할 수 있으며 Hive의 UDF 및 UDAF를 사용할 수 있습니다.
  • 고성능: Catalyst 옵티마이저와 Tungsten 실행 엔진을 통해 효율적인 쿼리 성능과 메모리 관리를 실현합니다.
  • 다양한 조작 방법: 높은 유연성으로 SQL 및 API 프로그래밍의 두 가지 작업 모드를 지원합니다.
  • 외부 도구 인터페이스: 데이터 처리에 Spark를 사용할 수 있도록 타사 도구에 대한 JDBC/ODBC 인터페이스를 제공합니다.
  • 고급 인터페이스: 데이터를 편리하게 처리할 수 있도록 상위 수준의 인터페이스를 제공합니다.

3. Spark SQL 작동 원리

여기에 이미지 설명을 삽입하세요.

쿼리 구문 분석: SQL 쿼리를 추상 구문 트리(AST)로 구문 분석합니다.

논리적 계획 생성:AST를 최적화되지 않은 논리적 계획으로 변환합니다.

논리적 계획 최적화: Catalyst 최적화 프로그램을 사용하여 논리적 계획에 대한 일련의 규칙을 최적화합니다.

물리적 계획 생성: 최적화된 논리적 계획을 하나 이상의 물리적 계획으로 변환하고 최적의 물리적 계획을 선택합니다.

실행: 물리적 계획을 RDD로 변환하고 클러스터에서 병렬로 실행합니다.

4. 스파크 SQL API 개요

스파크컨텍스트 : SparkContext는 Spark 애플리케이션의 주요 진입점이며 Spark 클러스터 연결, 리소스 관리 및 작업 예약을 담당합니다. Spark 2.0 이후에는 SparkContext 대신 SparkSession을 사용하는 것이 좋습니다.

SQL 컨텍스트 : SQLContext는 Spark SQL의 프로그래밍 진입점으로, 사용자가 SQL 쿼리 또는 DataFrame API를 통해 데이터 처리를 수행할 수 있습니다. 기본적인 Spark SQL 기능을 제공합니다.

하이브 컨텍스트: HiveContext는 Hive에 대한 통합 지원을 추가하는 SQLContext의 하위 집합입니다. Hive의 UDF 및 UDAF를 사용하여 Hive의 데이터 및 메타데이터에 직접 액세스할 수 있습니다.

스파크세션 : SparkSession은 Spark 2.0에서 도입된 새로운 개념으로, SQLContext와 HiveContext의 기능을 병합하여 통일된 프로그래밍 인터페이스를 제공합니다. SparkSession은 DataFrame 및 Dataset API를 사용한 데이터 처리를 지원하는 Spark SQL의 권장 진입점입니다.

SparkContext 및 SparkSession 생성에 대한 참고 사항 : SparkContext와 SparkSession을 동시에 생성해야 하는 경우 먼저 SparkContext를 생성한 후 SparkSession을 생성해야 합니다. SparkSession을 먼저 생성한 후 SparkContext를 생성하면 동일한 JVM에서 하나의 SparkContext만 실행할 수 있으므로 예외가 발생합니다.

5. 스파크 SQL 종속성

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. 스파크 SQL 데이터세트

Spark SQL에서 데이터 세트는 주로 DataFrame과 Dataset 유형으로 나뉩니다. 이는 구조화된 데이터와 반구조화된 데이터를 처리하고 조작하기 위한 핵심 추상화입니다.

1、데이터 프레임

데이터세트는 Spark 2.0에 도입된 새로운 추상 데이터 구조로, 강력한 형식이며 JVM 개체를 저장할 수 있습니다. Dataset API는 DataFrame의 운영 단순성과 유형 안전성을 결합하며 더 높은 수준의 데이터 유형 제어 및 객체 지향 프로그래밍 스타일이 필요한 시나리오에 적합합니다. 구체적인 특징은 다음과 같습니다.

  • 2차원 테이블과 유사: DataFrame은 기존 관계형 데이터베이스의 2차원 테이블과 유사합니다.
  • 스키마(데이터 구조 정보): RDD를 기반으로 데이터 구조의 정보를 기술하기 위해 Schema를 추가합니다.
  • 중첩된 데이터 유형 지원: DataFrame의 스키마는 다음과 같은 중첩된 데이터 유형을 지원합니다. structmap 그리고array
  • 풍부한 SQL 작업 API: 데이터 쿼리 및 작업을 용이하게 하기 위해 SQL 작업과 유사한 API를 더 많이 제공합니다.

2、데이터셋

데이터세트는 Spark 2.0에 도입된 새로운 추상 데이터 구조로, 강력한 형식이며 JVM 개체를 저장할 수 있습니다. Dataset API는 DataFrame의 운영 단순성과 유형 안전성을 결합하며 더 높은 수준의 데이터 유형 제어 및 객체 지향 프로그래밍 스타일이 필요한 시나리오에 적합합니다. 구체적인 특징은 다음과 같습니다.

  • 강력한 형식: Spark 1.6에 도입된 보다 일반적인 데이터 컬렉션인 Dataset은 강력한 형식이며 형식이 안전한 작업을 제공합니다.
  • RDD + 스키마: Dataset은 RDD와 Schema의 조합이라고 볼 수 있습니다. RDD의 분산 컴퓨팅 능력과 데이터 구조를 기술하는 Schema의 정보를 모두 갖고 있습니다.
  • 도메인별 개체에 적용됩니다.: 저장하고 조작할 수 있는 강력한 형식의 도메인별 개체 컬렉션입니다.
  • 병렬 작동: 함수나 관련 연산을 이용하여 변환과 연산을 병렬로 수행할 수 있습니다.

3. DataFrame과 Dataset의 관계

  • DataFrame은 특별한 데이터 세트입니다.: DataFrame은 Dataset의 특별한 경우입니다. DataFrame = Dataset[Row]
  • 데이터 추상화 및 운영 방식의 일원화: DataFrame과 Dataset은 Spark SQL의 데이터 추상화와 운영 방식을 통합하여 유연하고 강력한 데이터 처리 기능을 제공합니다.

7. Spark SQL의 기본 사용법

1. Scala는 SparkSession 객체를 생성합니다.

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. DataFrame 및 Dataset 생성 방법

1. 컬렉션에서 만들기

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. 파일 시스템에서 읽기

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. 관계형 데이터베이스에서 읽기

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. 구조화되지 않은 데이터 소스에서 읽기

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. 수동으로 데이터세트 생성

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、데이터프레임 API

문법 예 1

시뮬레이션 데이터(1000개 항목):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

요구 사항: 더 많은 인구(ID 수 &gt;50)와 함께 평균 연령이 가장 높은 도시 및 성별 조합과 각 성별 내에서 이러한 조합의 순위.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

결과:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

구문 예제 2: view, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

구문 예 3: Join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left 왼쪽 조인,rignt 바로 가입,full 완전 외부 조인,anti왼쪽 차이 세트,semi왼쪽 교차로

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

결과

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43