私の連絡先情報
郵便メール:
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
スパークSQL これは、構造化データの処理に特に使用される Apache Spark のモジュールです。 SQL クエリと Spark プログラミングの強力な機能を統合し、ビッグ データの処理をより効率的かつ簡単にします。 Spark SQL を介して、ユーザーは Spark で直接 SQL クエリを使用したり、データ操作に DataFrame および DataSet API を使用したりできます。
Spark SQL のアーキテクチャは主に次のコンポーネントで構成されます。
クエリ解析: SQL クエリを抽象構文ツリー (AST) に解析します。
論理計画の生成:AST を最適化されていない論理プランに変換します。
論理計画の最適化: Catalyst オプティマイザーを使用して、論理プランの一連のルールを最適化します。
物理計画の生成: 最適化された論理プランを 1 つ以上の物理プランに変換し、最適な物理プランを選択します。
実行: 物理プランを RDD に変換し、クラスター上で並列実行します。
スパークコンテキスト : SparkContext は、Spark アプリケーションのメイン エントリ ポイントであり、Spark クラスターへの接続、リソースの管理、およびタスクのスケジューリングを担当します。 Spark 2.0 以降では、SparkContext の代わりに SparkSession を使用することをお勧めします。
SQLコンテキスト : SQLContext は Spark SQL のプログラミング エントリ ポイントであり、ユーザーが SQL クエリまたは DataFrame API を通じてデータ処理を実行できるようにします。基本的な Spark SQL 機能を提供します。
ハイブコンテキスト: HiveContext は SQLContext のサブセットであり、Hive の統合サポートを追加し、Hive の UDF と UDAF を使用して Hive 内のデータとメタデータに直接アクセスできます。
スパークセッション : SparkSession は、Spark 2.0 で導入された新しい概念であり、SQLContext と HiveContext の機能を統合し、統一されたプログラミング インターフェイスを提供します。 SparkSession は Spark SQL の推奨エントリ ポイントであり、DataFrame および Dataset API を使用したデータ処理をサポートします。
SparkContextとSparkSessionの作成に関する注意事項注: SparkContext と SparkSession を同時に作成する必要がある場合は、最初に SparkContext を作成してから、SparkSession を作成する必要があります。最初に SparkSession を作成してから SparkContext を作成すると、同じ JVM 内で実行できる SparkContext は 1 つだけであるため、例外が発生します。
<properties>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Spark SQL では、データセットは主に DataFrame と Dataset のタイプに分類されます。これらは、構造化データおよび半構造化データを処理および操作するための中核となる抽象化です。
データセットは、Spark 2.0 で導入された新しい抽象データ構造であり、厳密に型指定されており、JVM オブジェクトを格納できます。 Dataset API は、DataFrame の操作の単純さとタイプ セーフティを組み合わせており、より高いレベルのデータ型制御とオブジェクト指向プログラミング スタイルを必要とするシナリオに適しています。具体的な機能は次のとおりです。
struct
、map
そしてarray
。データセットは、Spark 2.0 で導入された新しい抽象データ構造であり、厳密に型指定されており、JVM オブジェクトを格納できます。 Dataset API は、DataFrame の操作の単純さとタイプ セーフティを組み合わせており、より高いレベルのデータ型制御とオブジェクト指向プログラミング スタイルを必要とするシナリオに適しています。具体的な機能は次のとおりです。
DataFrame = Dataset[Row]
。import org.apache.spark.sql.SparkSession
object SparkSqlContext {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象,设置应用程序的配置
val conf: SparkConf = new SparkConf()
.setMaster("local[4]") // 设置本地运行模式,使用 4 个线程
.setAppName("spark sql") // 设置应用程序名称为 "spark sql"
// 创建 SparkSession 对象,用于 Spark SQL 的编程入口
val spark: SparkSession = SparkSession.builder()
.config(conf) // 将 SparkConf 配置应用于 SparkSession
.getOrCreate() // 获取现有的 SparkSession,或者新建一个
// 获取 SparkContext 对象,可以直接从 SparkSession 中获取
val sc: SparkContext = spark.sparkContext
// 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
import spark.implicits._
// 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...
// 停止 SparkSession,释放资源
spark.stop()
}
}
1. コレクションから作成する
case class Person(name: String, age: Int) // 下同
val data1 = Seq(Person("Alice", 25), Person("Bob", 30))
val ds: Dataset[Person] = spark.createDataset(data) // 这里的spark是SparkSession对象(如上代码),下同
val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
1. ファイルシステムから読み取る
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]
val dfCsv: DataFrame = spark.read
// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。
.schema(schema)
// 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。
.option("header", "true")
.csv("/path/to/csv/file")
3. リレーショナル データベースからの読み取り
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")
val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]
val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
4. 非構造化データソースからの読み取り
val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]
val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
5. データセットを手動で作成する
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]
val dfManual: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(data), schema
)
文法例 1
シミュレーションデータ(1000件):
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
要件: 人口が多く(ID 数 > 50)、平均年齢が最も高い都市と性別の組み合わせ、およびそれぞれの性別内でのこれらの組み合わせのランキング。
// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._
// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType),
StructField("gender", StringType),
StructField("age", IntegerType),
StructField("city", StringType),
))
// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
.partitionBy($"gender")
.orderBy($"avg_age".desc)
// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
// .schema(schema) // 应用我们定义的schema
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv") // DataFrame
.select($"id", $"name", $"age", $"city", $"gender") // 选择需要的列(不写默认就是全选)
.groupBy($"city", $"gender") // 按城市和性别分组
.agg( // 多重聚合
count($"id").as("count"), // 计算每个组的ID数量
round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
)
.where($"count".gt(50)) // 过滤出ID数量大于(可以使用>)50的组
.orderBy($"avg_age".desc) // 按平均年龄降序排序
.select($"city", $"gender", $"avg_age",
dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
.show() // 显示结果
結果:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 东莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|广州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
構文例 2: view、sql
// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv")
// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
"""
|select * from people_view
|where gender = '男'
|""".stripMargin
).show()
構文例 3: join
case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)
val frmStu = spark.createDataFrame(
Seq(
Student("张三", 1),
Student("李四", 1),
Student("王五", 2),
Student("赵六", 2),
Student("李明", 2),
Student("王刚", 4),
Student("王朋", 5),
)
)
val frmClass = spark.createDataFrame(
Seq(
Class(1, "name1"),
Class(2, "name2"),
Class(3, "name3"),
Class(4, "name4")
)
)
left
左結合、rignt
右結合、full
完全外部結合、anti
左差分セット、semi
左交差点
// 别名 + inner 内连接
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId") // joinType 默认 inner内连接
.show()
// 使用左外连接将df和frmClass根据classId合并
frmStu
.join(frmClass, Seq("classId"), "left")
.show()
// 左差集
frmStu
.join(frmClass, Seq("classId"), "anti")
.show()
// 左交集
frmStu
.join(frmClass, Seq("classId"), "semi")
.show()
結果
别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三| 1| 1| name1|
|李四| 1| 1| name1|
|王五| 2| 2| name2|
|赵六| 2| 2| name2|
|李明| 2| 2| name2|
|王刚| 4| 4| name4|
+----+-------+-------+---------+
使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
| 1|张三| name1|
| 1|李四| name1|
| 2|王五| name2|
| 2|赵六| name2|
| 2|李明| name2|
| 4|王刚| name4|
| 5|王朋| null|
+-------+----+---------+
左差集
+-------+----+
|classId|name|
+-------+----+
| 5|王朋|
+-------+----+
左交集
+-------+----+
|classId|name|
+-------+----+
| 1|张三|
| 1|李四|
| 2|王五|
| 2|赵六|
| 2|李明|
| 4|王刚|
+-------+----+