Κοινή χρήση τεχνολογίας

Επισκόπηση Spark SQL

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Επισκόπηση Spark SQL

Spark SQL Είναι μια ενότητα του Apache Spark που χρησιμοποιείται ειδικά για την επεξεργασία δομημένων δεδομένων. Ενσωματώνει τις ισχυρές λειτουργίες του SQL query και του προγραμματισμού Spark, καθιστώντας την επεξεργασία μεγάλων δεδομένων πιο αποτελεσματική και ευκολότερη. Μέσω του Spark SQL, οι χρήστες μπορούν να χρησιμοποιούν ερωτήματα SQL απευθείας στο Spark ή να χρησιμοποιούν τα API DataFrame και DataSet για λειτουργίες δεδομένων.

1. Αρχιτεκτονική Spark SQL

Η αρχιτεκτονική του Spark SQL αποτελείται κυρίως από τα ακόλουθα στοιχεία:

  1. SparkSession: Ένα ενοποιημένο σημείο εισόδου για εφαρμογές Spark, που χρησιμοποιείται για τη δημιουργία DataFrames, DataSets και την εκτέλεση ερωτημάτων SQL.
  2. Βελτιστοποιητής καταλύτη: Η μηχανή βελτιστοποίησης ερωτημάτων του Spark SQL, υπεύθυνη για την ανάλυση, την ανάλυση, τη βελτιστοποίηση και τη δημιουργία σχεδίων φυσικής εκτέλεσης.
  3. DataFrame και DataSet API: Παρέχει αντικειμενοστραφή διεπαφή προγραμματισμού και υποστηρίζει μεθόδους λειτουργίας εμπλουτισμένων δεδομένων.
  4. Διεπαφή πηγής δεδομένων: Υποστηρίζει πολλαπλές πηγές δεδομένων, όπως HDFS, S3, HBase, Cassandra, Hive κ.λπ.
  5. μηχανή εκτέλεσης: Μετατρέψτε βελτιστοποιημένα σχέδια ερωτημάτων σε εργασίες εκτέλεσης και εκτελέστε αυτές τις εργασίες παράλληλα σε ένα κατανεμημένο σύμπλεγμα.

2. Spark SQL χαρακτηριστικά

  • Ενοποιημένη διεπαφή πρόσβασης δεδομένων: Υποστηρίζει πολλαπλές πηγές δεδομένων (όπως CSV, JSON, Parquet, Hive, JDBC, HBase, κ.λπ.) και παρέχει μια συνεπή διεπαφή ερωτημάτων.
  • DataFrame και Dataset API: Παρέχει μια αντικειμενοστραφή διεπαφή προγραμματισμού, υποστηρίζει λειτουργίες ασφαλείς για τον τύπο και διευκολύνει την επεξεργασία δεδομένων.
  • Βελτιστοποιητής καταλύτη: Αυτόματη μετατροπή των ερωτημάτων χρήστη σε αποτελεσματικά σχέδια εκτέλεσης για τη βελτίωση της απόδοσης των ερωτημάτων.
  • Ενσωμάτωση με το Hive: Ενσωματώνει απρόσκοπτα το Hive, μπορεί να έχει απευθείας πρόσβαση σε υπάρχοντα δεδομένα Hive και να χρησιμοποιεί το UDF και το UDAF του Hive.
  • υψηλή απόδοση: Επιτύχετε αποτελεσματική απόδοση ερωτημάτων και διαχείριση μνήμης μέσω του βελτιστοποιητή Catalyst και της μηχανής εκτέλεσης βολφραμίου.
  • Διάφορες μέθοδοι λειτουργίας: Υποστηρίζει δύο τρόπους λειτουργίας: προγραμματισμό SQL και API, με υψηλή ευελιξία.
  • Εξωτερική διεπαφή εργαλείου: Παρέχει διεπαφή JDBC/ODBC για εργαλεία τρίτων για χρήση του Spark για επεξεργασία δεδομένων.
  • Προηγμένη διεπαφή: Παρέχει διεπαφή υψηλότερου επιπέδου για εύκολη επεξεργασία δεδομένων.

3. Αρχή λειτουργίας Spark SQL

Εισαγάγετε την περιγραφή της εικόνας εδώ

Ανάλυση ερωτήματος: Ανάλυση ερωτημάτων SQL σε αφηρημένα δέντρα σύνταξης (AST).

Δημιουργία Λογικού Σχεδίου:Μετατροπή AST σε μη βελτιστοποιημένο λογικό σχέδιο.

Βελτιστοποίηση Λογικού Σχεδίου: Χρησιμοποιήστε το εργαλείο βελτιστοποίησης Catalyst για να βελτιστοποιήσετε μια σειρά κανόνων στο λογικό σχέδιο.

Δημιουργία Φυσικού Σχεδίου: Μετατρέψτε το βελτιστοποιημένο λογικό σχέδιο σε ένα ή περισσότερα φυσικά σχέδια και επιλέξτε το βέλτιστο φυσικό σχέδιο.

Εκτέλεση:Μετατρέψτε το φυσικό σχέδιο σε RDD και εκτελέστε το παράλληλα στο σύμπλεγμα.

4. Επισκόπηση του Spark SQL API

SparkContext : Το SparkContext είναι το κύριο σημείο εισόδου της εφαρμογής Spark και είναι υπεύθυνο για τη σύνδεση στο σύμπλεγμα Spark, τη διαχείριση πόρων και τον προγραμματισμό εργασιών. Μετά το Spark 2.0, συνιστάται η χρήση του SparkSession αντί του SparkContext.

SQLCcontext : Το SQLContext είναι το σημείο εισόδου προγραμματισμού για το Spark SQL, που επιτρέπει στους χρήστες να εκτελούν επεξεργασία δεδομένων μέσω ερωτημάτων SQL ή DataFrame API. Παρέχει βασική λειτουργικότητα Spark SQL.

HiveContext: Το HiveContext είναι ένα υποσύνολο του SQLContext, το οποίο προσθέτει ενσωματωμένη υποστήριξη για το Hive Μπορεί να έχει άμεση πρόσβαση σε δεδομένα και μεταδεδομένα στο Hive, χρησιμοποιώντας το UDF και το UDAF του Hive.

SparkSession : Το SparkSession είναι μια νέα ιδέα που εισήχθη στο Spark 2.0. Συγχωνεύει τις λειτουργίες του SQLContext και του HiveContext και παρέχει μια ενοποιημένη διεπαφή προγραμματισμού. Το SparkSession είναι το προτεινόμενο σημείο εισόδου για το Spark SQL, που υποστηρίζει την επεξεργασία δεδομένων με χρήση των DataFrame και Dataset API.

Σημειώσεις για τη δημιουργία SparkContext και SparkSession : Εάν πρέπει να δημιουργήσετε το SparkContext και το SparkSession ταυτόχρονα, πρέπει πρώτα να δημιουργήσετε το SparkContext και μετά να δημιουργήσετε το SparkSession. Εάν δημιουργήσετε πρώτα το SparkSession και μετά δημιουργήσετε το SparkContext, θα προκύψει μια εξαίρεση επειδή μόνο ένα SparkContext μπορεί να εκτελεστεί στο ίδιο JVM.

5. Spark εξαρτήσεις SQL

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6. Spark SQL Dataset

Στο Spark SQL, τα σύνολα δεδομένων χωρίζονται κυρίως στους ακόλουθους τύπους: DataFrame και Dataset. Είναι βασικές αφαιρέσεις για την επεξεργασία και τον χειρισμό δομημένων και ημιδομημένων δεδομένων.

1, DataFrame

Το σύνολο δεδομένων είναι μια νέα αφηρημένη δομή δεδομένων που εισήχθη στο Spark 2.0 Είναι έντονα πληκτρολογημένη και μπορεί να αποθηκεύσει αντικείμενα JVM. Το Dataset API συνδυάζει τη λειτουργική απλότητα και την ασφάλεια τύπου του DataFrame και είναι κατάλληλο για σενάρια που απαιτούν υψηλότερο επίπεδο ελέγχου τύπων δεδομένων και αντικειμενοστραφή στυλ προγραμματισμού. Τα συγκεκριμένα χαρακτηριστικά είναι τα εξής:

  • Παρόμοιο με ένα δισδιάστατο τραπέζι: Το DataFrame είναι παρόμοιο με έναν δισδιάστατο πίνακα σε μια παραδοσιακή σχεσιακή βάση δεδομένων.
  • Σχήμα (πληροφορίες δομής δεδομένων): Το σχήμα προστίθεται με βάση το RDD για να περιγράψει τις πληροφορίες της δομής δεδομένων.
  • Υποστήριξη ένθετων τύπων δεδομένων: Το Σχήμα του DataFrame υποστηρίζει ένθετους τύπους δεδομένων, όπως π.χ structmap καιarray
  • Πλούσιο API λειτουργίας SQL: Παρέχει περισσότερα API παρόμοια με τις λειτουργίες SQL για τη διευκόλυνση της αναζήτησης δεδομένων και της λειτουργίας.

2, Σύνολο δεδομένων

Το σύνολο δεδομένων είναι μια νέα αφηρημένη δομή δεδομένων που εισήχθη στο Spark 2.0 Είναι έντονα πληκτρολογημένη και μπορεί να αποθηκεύσει αντικείμενα JVM. Το Dataset API συνδυάζει τη λειτουργική απλότητα και την ασφάλεια τύπου του DataFrame και είναι κατάλληλο για σενάρια που απαιτούν υψηλότερο επίπεδο ελέγχου τύπων δεδομένων και αντικειμενοστραφή στυλ προγραμματισμού. Τα συγκεκριμένα χαρακτηριστικά είναι τα εξής:

  • Δακτυλογραφημένο έντονα: Μια πιο γενική συλλογή δεδομένων που εισήχθη στο Spark 1.6, το σύνολο δεδομένων πληκτρολογείται έντονα και παρέχει λειτουργίες ασφαλείς για τον τύπο.
  • RDD + Σχήμα: Μπορεί να θεωρηθεί ότι το σύνολο δεδομένων είναι ένας συνδυασμός RDD και Schema Έχει τις κατανεμημένες υπολογιστικές δυνατότητες του RDD και τις πληροφορίες του Schema που περιγράφουν τη δομή δεδομένων.
  • Ισχύει για αντικείμενα συγκεκριμένου τομέα: Μια συλλογή με έντονα πληκτρολόγηση αντικειμένων για συγκεκριμένο τομέα που μπορούν να αποθηκευτούν και να χειριστούν.
  • Παράλληλη λειτουργία: Οι μετατροπές και οι λειτουργίες μπορούν να εκτελεστούν παράλληλα χρησιμοποιώντας συναρτήσεις ή σχετικές λειτουργίες.

3. Η σχέση μεταξύ DataFrame και Dataset

  • Το DataFrame είναι ένα ειδικό σύνολο δεδομένων: Το DataFrame είναι μια ειδική περίπτωση του Dataset, δηλαδή DataFrame = Dataset[Row]
  • Ενοποίηση μεθόδων αφαίρεσης και λειτουργίας δεδομένων: Το DataFrame και το Dataset ενοποιούν τις μεθόδους αφαίρεσης και λειτουργίας δεδομένων του Spark SQL, παρέχοντας ευέλικτες και ισχυρές δυνατότητες επεξεργασίας δεδομένων.

7. Βασική χρήση του Spark Sql

1. Η Scala δημιουργεί αντικείμενο SparkSession

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2. Πώς να δημιουργήσετε DataFrame και Dataset

1. Δημιουργία από μια συλλογή

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1. Διαβάστε από το σύστημα αρχείων

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3. Διαβάστε από σχεσιακή βάση δεδομένων

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. Διαβάστε από μη δομημένες πηγές δεδομένων

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
  • 1
  • 2
  • 3

5. Δημιουργήστε με μη αυτόματο τρόπο σύνολο δεδομένων

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3, DataFrame API

Παράδειγμα γραμματικής ένα

Δεδομένα προσομοίωσης (1000 στοιχεία):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Απαίτηση: Ποιοι συνδυασμοί πόλης και φύλου έχουν τον υψηλότερο μέσο όρο ηλικίας με μεγαλύτερους πληθυσμούς (αριθμός ταυτοτήτων &gt;50) και η κατάταξη αυτών των συνδυασμών εντός των αντίστοιχων φύλων.

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\projects\sparkSql\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

αποτέλεσμα:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Παράδειγμα σύνταξης δύο: προβολή, sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\projects\sparkSql\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Παράδειγμα σύνταξης τρία: join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

left αριστερά συμμετοχή,rignt δεξιά ένωσε,full πλήρης εξωτερική ένωση,antiαριστερά σετ διαφοράς,semiαριστερή διασταύρωση

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

αποτέλεσμα

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43