2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
स्पार्क एसक्यूएल इदं अपाचे स्पार्क् इत्यस्य मॉड्यूल् अस्ति यस्य उपयोगः विशेषतया संरचितदत्तांशस्य संसाधनार्थं भवति । एतत् SQL क्वेरी तथा Spark प्रोग्रामिंग् इत्येतयोः शक्तिशालिनः कार्याणि एकीकृत्य बृहत् आँकडानां संसाधनं अधिकं कुशलं सुलभं च करोति । Spark SQL इत्यस्य माध्यमेन उपयोक्तारः प्रत्यक्षतया Spark इत्यस्मिन् SQL प्रश्नानां उपयोगं कर्तुं शक्नुवन्ति, अथवा आँकडा-सञ्चालनार्थं DataFrame तथा DataSet APIs इत्यस्य उपयोगं कर्तुं शक्नुवन्ति ।
Spark SQL इत्यस्य आर्किटेक्चर मुख्यतया निम्नलिखितघटकाः सन्ति ।
क्वेरी पार्सिंग: SQL प्रश्नान् अमूर्तवाक्यविन्यासवृक्षेषु (AST) विश्लेषणं करोति ।
तार्किक योजना जननम्:AST अअनुकूलित तार्किकयोजनायां परिवर्तयन्तु।
तार्किक योजना अनुकूलन: तार्किकयोजनायां नियमानाम् एकां श्रृङ्खलां अनुकूलितुं Catalyst optimizer इत्यस्य उपयोगं कुर्वन्तु ।
भौतिक योजना जननम्: अनुकूलितं तार्किकयोजनां एकस्मिन् वा अधिकेषु भौतिकयोजनासु परिवर्त्य इष्टतमभौतिकयोजनां चयनं कुर्वन्तु।
क्रियान्वयन: भौतिकयोजनां RDD इत्यत्र परिवर्त्य क्लस्टर् इत्यत्र समानान्तरेण निष्पादयन्तु ।
स्पार्कसंदर्भ : SparkContext Spark अनुप्रयोगस्य मुख्यः प्रवेशबिन्दुः अस्ति तथा च Spark क्लस्टरेन सह सम्बद्धतां, संसाधनानाम् प्रबन्धनं, कार्यनिर्धारणं च कर्तुं उत्तरदायी अस्ति । Spark 2.0 इत्यस्य अनन्तरं SparkContext इत्यस्य स्थाने SparkSession इत्यस्य उपयोगः अनुशंसितः अस्ति ।
SQLContext इति : SQLContext Spark SQL कृते प्रोग्रामिंग प्रवेशबिन्दुः अस्ति, यत् उपयोक्तृभ्यः SQL प्रश्नानां अथवा DataFrame API इत्यस्य माध्यमेन आँकडासंसाधनं कर्तुं शक्नोति । एतत् मूलभूतं Spark SQL कार्यक्षमतां प्रदाति ।
HiveContext इति: HiveContext SQLContext इत्यस्य उपसमूहः अस्ति, यः Hive इत्यस्य एकीकृतसमर्थनं योजयति, Hive इत्यस्य UDF तथा UDAF इत्यस्य उपयोगेन, Hive इत्यस्मिन् प्रत्यक्षतया आँकडान् मेटाडाटा च अभिगन्तुं शक्नोति ।
स्पार्कसत्रम् : SparkSession इति Spark 2.0 इत्यस्मिन् प्रवर्तितं नूतनं अवधारणा अस्ति यत् एतत् SQLContext तथा HiveContext इत्येतयोः कार्ययोः विलयं करोति तथा च एकीकृतं प्रोग्रामिंग इन्टरफेस् प्रदाति । SparkSession Spark SQL कृते अनुशंसितः प्रवेशबिन्दुः अस्ति, यत् DataFrame तथा Dataset APIs इत्येतयोः उपयोगेन आँकडासंसाधनस्य समर्थनं करोति ।
SparkContext तथा SparkSession इत्यस्य निर्माणविषये टिप्पणयः : यदि भवन्तः एकस्मिन् समये SparkContext तथा SparkSession निर्मातुं इच्छन्ति तर्हि प्रथमं SparkContext निर्मातव्यम्, ततः SparkSession निर्मातव्यम् । यदि भवान् प्रथमं SparkSession रचयति ततः SparkContext रचयति तर्हि अपवादः भविष्यति यतोहि केवलं एकः SparkContext एव एकस्मिन् JVM मध्ये चालयितुं शक्नोति ।
<properties>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Spark SQL इत्यस्मिन् आँकडासमूहाः मुख्यतया निम्नलिखितप्रकारेषु विभक्ताः सन्ति : DataFrame तथा Dataset । ते संरचित-अर्ध-संरचित-दत्तांशस्य संसाधनाय, हेरफेराय च मूल-अमूर्ताः सन्ति ।
Dataset इति Spark 2.0 इत्यस्मिन् प्रवर्तितं नूतनं abstract data structure अस्ति यत् इदं दृढतया टङ्कितम् अस्ति तथा च JVM ऑब्जेक्ट्स् संग्रहीतुं शक्नोति । Dataset API DataFrame इत्यस्य परिचालनसाधारणतां प्रकारसुरक्षां च संयोजयति तथा च एतादृशानां परिदृश्यानां कृते उपयुक्तं भवति येषु उच्चस्तरस्य आँकडाप्रकारनियन्त्रणस्य वस्तु-उन्मुखस्य प्रोग्रामिंगशैल्याः आवश्यकता भवति विशिष्टानि विशेषतानि निम्नलिखितरूपेण सन्ति ।
struct
、map
तथाarray
。Dataset इति Spark 2.0 इत्यस्मिन् प्रवर्तितं नूतनं abstract data structure अस्ति यत् इदं दृढतया टङ्कितम् अस्ति तथा च JVM ऑब्जेक्ट्स् संग्रहीतुं शक्नोति । Dataset API DataFrame इत्यस्य परिचालनसाधारणतां प्रकारसुरक्षां च संयोजयति तथा च एतादृशानां परिदृश्यानां कृते उपयुक्तं भवति येषु उच्चस्तरस्य आँकडाप्रकारनियन्त्रणस्य वस्तु-उन्मुखस्य प्रोग्रामिंगशैल्याः आवश्यकता भवति विशिष्टानि विशेषतानि निम्नलिखितरूपेण सन्ति ।
DataFrame = Dataset[Row]
。import org.apache.spark.sql.SparkSession
object SparkSqlContext {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象,设置应用程序的配置
val conf: SparkConf = new SparkConf()
.setMaster("local[4]") // 设置本地运行模式,使用 4 个线程
.setAppName("spark sql") // 设置应用程序名称为 "spark sql"
// 创建 SparkSession 对象,用于 Spark SQL 的编程入口
val spark: SparkSession = SparkSession.builder()
.config(conf) // 将 SparkConf 配置应用于 SparkSession
.getOrCreate() // 获取现有的 SparkSession,或者新建一个
// 获取 SparkContext 对象,可以直接从 SparkSession 中获取
val sc: SparkContext = spark.sparkContext
// 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
import spark.implicits._
// 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...
// 停止 SparkSession,释放资源
spark.stop()
}
}
1. संग्रहात् रचयतु
case class Person(name: String, age: Int) // 下同
val data1 = Seq(Person("Alice", 25), Person("Bob", 30))
val ds: Dataset[Person] = spark.createDataset(data) // 这里的spark是SparkSession对象(如上代码),下同
val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
1. सञ्चिकातन्त्रात् पठन्तु
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]
val dfCsv: DataFrame = spark.read
// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。
.schema(schema)
// 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。
.option("header", "true")
.csv("/path/to/csv/file")
3. सम्बन्धात्मकदत्तांशकोशात् पठन्तु
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")
val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]
val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
4. असंरचितदत्तांशस्रोतात् पठन्तु
val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]
val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
5. Dataset इत्येतत् मैन्युअल् रूपेण रचयन्तु
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]
val dfManual: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(data), schema
)
व्याकरणम् उदाहरणम् एकं
अनुकरणदत्तांशः (१००० वस्तूनि): १.
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
आवश्यकता : कस्य नगरस्य लिंगसंयोजनस्य च बृहत्तरजनसंख्यायुक्तं (ID-सङ्ख्या >50) सर्वाधिकं औसतवयोः भवति, तथा च एतेषां संयोजनानां स्वस्वलिङ्गस्य अन्तः क्रमाङ्कनं भवति।
// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._
// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType),
StructField("gender", StringType),
StructField("age", IntegerType),
StructField("city", StringType),
))
// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
.partitionBy($"gender")
.orderBy($"avg_age".desc)
// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
// .schema(schema) // 应用我们定义的schema
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv") // DataFrame
.select($"id", $"name", $"age", $"city", $"gender") // 选择需要的列(不写默认就是全选)
.groupBy($"city", $"gender") // 按城市和性别分组
.agg( // 多重聚合
count($"id").as("count"), // 计算每个组的ID数量
round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
)
.where($"count".gt(50)) // 过滤出ID数量大于(可以使用>)50的组
.orderBy($"avg_age".desc) // 按平均年龄降序排序
.select($"city", $"gender", $"avg_age",
dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
.show() // 显示结果
परिणाम:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 东莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|广州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
वाक्य रचना उदाहरणद्वयम् : view, sql
// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
.option("header", "true") // 使用CSV的header作为列名
.csv("D:\projects\sparkSql\people.csv")
// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
"""
|select * from people_view
|where gender = '男'
|""".stripMargin
).show()
वाक्य रचना उदाहरणं त्रीणि : join
case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)
val frmStu = spark.createDataFrame(
Seq(
Student("张三", 1),
Student("李四", 1),
Student("王五", 2),
Student("赵六", 2),
Student("李明", 2),
Student("王刚", 4),
Student("王朋", 5),
)
)
val frmClass = spark.createDataFrame(
Seq(
Class(1, "name1"),
Class(2, "name2"),
Class(3, "name3"),
Class(4, "name4")
)
)
left
वामसंयोगः, २.rignt
दक्षिणं सम्मिलितं, ९.full
पूर्णबाह्यसंयोगः, २.anti
वाम भेद सेट्, २.semi
वामखण्डः
// 别名 + inner 内连接
frmStu.as("S")
.join(frmClass.as("C"), $"S.classId" === $"C.classId") // joinType 默认 inner内连接
.show()
// 使用左外连接将df和frmClass根据classId合并
frmStu
.join(frmClass, Seq("classId"), "left")
.show()
// 左差集
frmStu
.join(frmClass, Seq("classId"), "anti")
.show()
// 左交集
frmStu
.join(frmClass, Seq("classId"), "semi")
.show()
परिणाम
别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三| 1| 1| name1|
|李四| 1| 1| name1|
|王五| 2| 2| name2|
|赵六| 2| 2| name2|
|李明| 2| 2| name2|
|王刚| 4| 4| name4|
+----+-------+-------+---------+
使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
| 1|张三| name1|
| 1|李四| name1|
| 2|王五| name2|
| 2|赵六| name2|
| 2|李明| name2|
| 4|王刚| name4|
| 5|王朋| null|
+-------+----+---------+
左差集
+-------+----+
|classId|name|
+-------+----+
| 5|王朋|
+-------+----+
左交集
+-------+----+
|classId|name|
+-------+----+
| 1|张三|
| 1|李四|
| 2|王五|
| 2|赵六|
| 2|李明|
| 4|王刚|
+-------+----+