2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Spark SQL It is a module of Apache Spark that is specifically designed to process structured data. It integrates the power of SQL query and Spark programming, making it more efficient and convenient to process big data. With Spark SQL, users can use SQL queries directly in Spark, or use DataFrame and DataSet APIs to perform data operations.
The architecture of Spark SQL mainly consists of the following components:
Query Parsing: Parse SQL queries into an abstract syntax tree (AST).
Logical Plan Generation: Convert the AST into an unoptimized logical plan.
Logical Plan Optimization: Use the Catalyst optimizer to perform a series of rule optimizations on the logical plan.
Physical Plan Generation: Convert the optimized logical plan into one or more physical plans and select the optimal physical plan.
Execution: Convert the physical plan into RDD and execute it in parallel on the cluster.
SparkContext:SparkContext is the main entry point of the Spark application, responsible for connecting to the Spark cluster, managing resources and task scheduling. After Spark 2.0, it is recommended to use SparkSession instead of SparkContext.
SQLContext:SQLContext is the programming entry point of Spark SQL, allowing users to process data through SQL queries or DataFrame API. It provides basic Spark SQL functions.
HiveContext: HiveContext is a subset of SQLContext, which adds integrated support for Hive. It can directly access data and metadata in Hive and use Hive's UDF and UDAF.
SparkSession:SparkSession is a new concept introduced in Spark 2.0, which combines the functions of SQLContext and HiveContext and provides a unified programming interface. SparkSession is the recommended entry point for Spark SQL and supports data processing using DataFrame and Dataset APIs.
Notes on Creating SparkContext and SparkSession: If you need to create SparkContext and SparkSession at the same time, you must create SparkContext first and then SparkSession. If you create SparkSession first and then SparkContext, an exception will occur because only one SparkContext can run in the same JVM.
<properties>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
In Spark SQL, data sets are mainly divided into the following types: DataFrame and Dataset. They are the core abstractions for processing and manipulating structured and semi-structured data.
Dataset is a new abstract data structure introduced in Spark 2.0. It is strongly typed and can store JVM objects. The Dataset API combines the ease of operation and type safety of DataFrame and is suitable for scenarios that require higher-level data type control and object-oriented programming style. The specific features are as follows:
struct
、map
andarray
。Dataset is a new abstract data structure introduced in Spark 2.0. It is strongly typed and can store JVM objects. The Dataset API combines the ease of operation and type safety of DataFrame and is suitable for scenarios that require higher-level data type control and object-oriented programming style. The specific features are as follows:
DataFrame = Dataset[Row]
。import org.apache.spark.sql.SparkSession
object SparkSqlContext {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象,设置应用程序的配置
val conf: SparkConf = new SparkConf()
.setMaster("local[4]") // 设置本地运行模式,使用 4 个线程
.setAppName("spark sql") // 设置应用程序名称为 "spark sql"
// 创建 SparkSession 对象,用于 Spark SQL 的编程入口
val spark: SparkSession = SparkSession.builder()
.config(conf) // 将 SparkConf 配置应用于 SparkSession
.getOrCreate() // 获取现有的 SparkSession,或者新建一个
// 获取 SparkContext 对象,可以直接从 SparkSession 中获取
val sc: SparkContext = spark.sparkContext
// 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
import spark.implicits._
// 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...
// 停止 SparkSession,释放资源
spark.stop()
}
}
1. Create from a collection
case class Person(name: String, age: Int) // 下同
val data1 = Seq(Person("Alice", 25), Person("Bob", 30))
val ds: Dataset[Person] = spark.createDataset(data) // 这里的spark是SparkSession对象(如上代码),下同
val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
1. Read from the file system
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]
val dfCsv: DataFrame = spark.read
// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。
.schema(schema)
// 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。
.option("header", "true")
.csv("/path/to/csv/file")
3. Read from a relational database
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")
val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]
val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
4. Reading from unstructured data sources
val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]
val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
5. Manually create a Dataset
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]
val dfManual: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(data), schema
)
Syntax Example 1
Simulated data (1000 records):
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
Requirement: Which city and gender combinations have the highest average age when the population is large (number of IDs > 50), and the ranking of these combinations within the respective gender.
// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._
// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType),
StructField("gender", StringType),
StructField("age", IntegerType),
StructField("city", StringType),
))
// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
.partitionBy($"gender")
.orderBy($"avg_age".desc)
// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
// .schema(schema) // 应用我们定义的schema
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv") // DataFrame
.select($"id", $"name", $"age", $"city", $"gender") // 选择需要的列(不写默认就是全选)
.groupBy($"city", $"gender") // 按城市和性别分组
.agg( // 多重聚合
count($"id").as("count"), // 计算每个组的ID数量
round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
)
.where($"count".gt(50)) // 过滤出ID数量大于(可以使用>)50的组
.orderBy($"avg_age".desc) // 按平均年龄降序排序
.select($"city", $"gender", $"avg_age",
dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
.show() // 显示结果
result:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 东莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|广州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
Syntax Example 2: View, sql
// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv")
// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
"""
|select * from people_view
|where gender = '男'
|""".stripMargin
).show()
Syntax Example 3: join
case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)
val frmStu = spark.createDataFrame(
Seq(
Student("张三", 1),
Student("李四", 1),
Student("王五", 2),
Student("赵六", 2),
Student("李明", 2),
Student("王刚", 4),
Student("王朋", 5),
)
)
val frmClass = spark.createDataFrame(
Seq(
Class(1, "name1"),
Class(2, "name2"),
Class(3, "name3"),
Class(4, "name4")
)
)
left
Left join,rignt
Right join,full
Full outer join,anti
Left difference,semi
Left Intersection
// 别名 + inner 内连接
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId") // joinType 默认 inner内连接
.show()
// 使用左外连接将df和frmClass根据classId合并
frmStu
.join(frmClass, Seq("classId"), "left")
.show()
// 左差集
frmStu
.join(frmClass, Seq("classId"), "anti")
.show()
// 左交集
frmStu
.join(frmClass, Seq("classId"), "semi")
.show()
result
别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三| 1| 1| name1|
|李四| 1| 1| name1|
|王五| 2| 2| name2|
|赵六| 2| 2| name2|
|李明| 2| 2| name2|
|王刚| 4| 4| name4|
+----+-------+-------+---------+
使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
| 1|张三| name1|
| 1|李四| name1|
| 2|王五| name2|
| 2|赵六| name2|
| 2|李明| name2|
| 4|王刚| name4|
| 5|王朋| null|
+-------+----+---------+
左差集
+-------+----+
|classId|name|
+-------+----+
| 5|王朋|
+-------+----+
左交集
+-------+----+
|classId|name|
+-------+----+
| 1|张三|
| 1|李四|
| 2|王五|
| 2|赵六|
| 2|李明|
| 4|王刚|
+-------+----+