2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
(1) Nur wenn ein Aktionsoperator angetroffen wird, wird der gesamte Spark-Job zur Ausführung ausgelöst.
(2) Mehrmals angetroffen, mehrmals ausgeführt
Flexibilität: Die Datenmenge kann groß oder klein sein
RDD ähnelt einem Container, speichert jedoch keine Daten, sondern Berechnungslogik.
Wenn ein Aktionsoperator angetroffen wird, wird die Ausführung des gesamten Spark-Jobs ausgelöst. Ab dem ersten RDD beginnt der Datenfluss.
Daten sind nur eine fließende Beziehung zwischen RDDs und werden nicht gespeichert.
Die Menge der fließenden Daten kann groß oder klein sein, daher wird dies als Elastizität bezeichnet
verteilt:
Spark muss im Wesentlichen Daten von HDFS lesen. HDFS ist verteilt und Datenblöcke können sich in Zukunft auf verschiedenen Datenknoten befinden.
Die in RDD fließenden Daten können aus Blockdaten in verschiedenen Datenknoten stammen.
Datensatz:
Während des Berechnungsablaufs kann RDD kurzzeitig als Container betrachtet werden. Der Container enthält Daten, die standardmäßig nicht im Speicher gespeichert sind.
Es wird später eine Möglichkeit geben, die Daten eines RDD auf der Festplatte zu speichern.
1) Der Parameter minPartitions kann beim Lesen einer Datei nur die Mindestanzahl der Partitionen bestimmen. Die tatsächliche Anzahl der RDD-Partitionen nach dem Lesen der Datei wird durch den Dateninhalt selbst und die Verteilung des Clusters bestimmt.
2) Wenn die Anzahl der eingestellten minPartitionen geringer ist als die Anzahl der Blöcke, wird die Anzahl der Partitionen tatsächlich durch die Anzahl der Blöcke bestimmt.
3) Wenn Sie den Operator zum Generieren von Shuffle aufrufen, können Sie numPartitions (zum Beispiel: groupby()) übergeben, wodurch die Anzahl der Partitionen des RDD tatsächlich geändert werden kann. Wie viele Partitionen Sie festlegen, bestimmt, wie viele Partitionen das endgültige RDD enthält haben.
4) Die Datei wird in Form von Blöcken in HDFS gespeichert. Wenn die Datei den Standardwert von 128 MB nicht erreicht, wird sie ebenfalls in einem Block gespeichert.
Zunächst wird die Anzahl der Partitionen im RDD durch die Anzahl der Blöcke bestimmt, die Daten lesen.
Die Partitionsdaten im letzteren RDD entsprechen mit Ausnahme der KV-Funktion dem Ergebnis der logischen Verarbeitung der Partitionsdaten im vorherigen RDD. Wenn nachfolgende Partitionen nicht verarbeitet werden, hängt die Anzahl der Partitionen aller nachfolgenden RDDs standardmäßig vom ersten RDD ab.
Das endgültige RDD enthält mehrere Partitionen, und in Zukunft werden Sie mehrere Ergebnisdateien in HDFS sehen (HDFS -> RDD -> HDFS).
1) Verlässt sich eng auf eine Eins-zu-Eins-Beziehung, bei der Daten von einer bestimmten Partition im vorherigen RDD nur an eine eindeutige Partition im nächsten RDD gesendet werden (oder mehrere vorherige Partitionen können an die nächste Partition gesendet werden).
2) Eine Eins-zu-Viele-Beziehung mit großer Abhängigkeit von einer bestimmten Partition im vorherigen RDD wird in verschiedene Partitionen im späteren RDD eingehen. Sie können dies auch beurteilen, indem Sie prüfen, ob ein Shuffle generiert wird.
3) Der gesamte Spark-Job wird durch die Anzahl der breiten Abhängigkeiten in mehrere Phasen unterteilt: Num(stage) = Num(wide dependencies) + 1
4) Wenn Sie auf den Operator stoßen, der Shuffle generiert, ist dies der FallDas Phänomen, Daten vom vorherigen RDD auf die Festplatte zu schreiben und Daten von der Festplatte auf das nächste RDD zu lesen.
Hinweis: Wenn die Ausführung zum ersten Mal ausgelöst wird, befinden sich keine Daten auf der Festplatte, sodass die Ausführung mit dem ersten generierten RDD beginnt.
Wenn dieselbe Ausführung wiederholt ausgelöst wird, beginnt die Ausführung für denselben DAG-gesteuerten azyklischen Graphen nach dem Shuffle direkt vom RDD (wobei der Prozess des Schreibens von Daten auf die Festplatte vom vorherigen RDD weggelassen wird) und kann direkt von der Festplatte gelesen werden . Daten.
5) **In einer Phase,RDD hatBei mehreren Partitionen gibt es mehrere parallele Aufgaben.
Eines der Designprinzipien von Spark istDatenlokalisierung(Datenlokalität), das heißt, es wird versucht, Rechenaufgaben auf dem Knoten auszuführen, auf dem sich die Daten befinden, wodurch der Netzwerkübertragungsaufwand für Daten verringert wird.
object WordCount2 {
def main(args: Array[String]): Unit = {
//创建spark配置文件对象
val conf: SparkConf = new SparkConf()
//设置运行模式
//如果是本地local模式运行的话,需要设置setMaster
//将来如果是集群进行,将这句话注释即可
conf.setMaster("local")
//设置spark作业的名字
conf.setAppName("wordcount")
//创建spark core上下文环境对象
val sc: SparkContext = new SparkContext(conf)
//===================================================================================
//读取文件,每次读取一行
//RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
// println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")
//一行数据根据分隔符分割
val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
// println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")
//将每一个单词组成(word,1)
val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")
//根据键进行分组,并设置分区数为 5
val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")
val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
println(s"resRDD的分区数:${resRDD.getNumPartitions}")
//打印
resRDD2.foreach(println)
//指定的是所要写入数据的文件夹的路径
//spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
resRDD.saveAsTextFile("spark/data/outdata1")
}
}
Die Betreiber sind in zwei Kategorien unterteilt:
1. Konvertierungsoperator (RDD -> RDD, Verarbeitungslogik)
2. Aktionsoperator (löst die Jobausführung aus)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo1Map {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
//将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
println("==============处理后的数据========================")
val array1: Array[String] = line.split(",")
(array1(0),array1(1),array1(2),array1(3),array1(4))
})
//foreach是一个行动算子,遇到行动算子,触发作业执行
/**
* 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
* 只有当需要计算一个结果时(即调用行动算子时),才会执行。
* 打印结果:
* ==============处理后的数据========================
* (1500100001,施笑槐,22,女,文科六班)
* ==============处理后的数据========================
* (1500100002,吕金鹏,24,男,文科六班)
*每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
*/
rdd2.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo2Filter {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//需求:过滤出所有的男生
//filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
// 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
// 但是判断中的println()属于scala,并不受影响
val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
var b: Boolean = false
if ("女".equals(line.split(",")(3))) {
println("============这是女生==================")
} else {
println("============这是男生==================")
b = "男".equals(line.split(",")(3))
}
b
})
genderRDD.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo3FlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")
/**
* flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
* 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
* 打印结果:
* ===============一条数据====================
* hello
* world
* ===============一条数据====================
* java
* hadoop
* linux
*/
val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
println("===============一条数据====================")
line.split("\|")
})
rdd1.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo4Sample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
/**
* sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
* 这个函数主要在机器学习的时候会用到
* withReplacement :
* 为True时,抽样结果中可能会包含重复的元素。
* 为False时,抽样结果中不会包含重复的元素。
* fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
*/
val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)
rdd1.foreach(println)
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo5GroupBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupBy")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
/**
* groupBy算子的使用
*
* 1、groupBy的算子,后面的分组条件是我们自己指定的
* 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/
// val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
// groupRDD.foreach(println)
val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
val clazz: String = kv._1
val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size
(clazz, avgAge)
})
resKvRDD.foreach(println)
// while (true){
//
// }
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo6GroupByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
/**
* GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
* 也就说,只有kv格式的RDD才能调用kv格式的算子
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/
val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
resKvRDD2.foreach(println)
/**
* 面试题:spark core中 groupBy算子与groupByKey算子的区别?
* 1、代码格式上:
* groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
* groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
*
* 2、执行shuffle数据量来看
* groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
* 所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
*/
while (true) {
}
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo7ReduceByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//分别使用groupByKey和reduceBykey计算每个学生的总分
val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(id: String, _, score: String) =>
(id, score.toInt)
}
/**
* groupByKey实现
*/
// val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
// val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
// resRDD1.foreach(println)
/**
* reduceByKey实现
* 输出:
* (1500100113,519)
* (1500100724,440)
* (1500100369,376)
* (1500100378,402)
* (1500100306,505)
* (1500100578,397)
*/
val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
resRDD2.foreach(println)
/**
* 面试题:
* groupByKey与reduceBykey的区别?
* 相同点:
* 它们都是kv格式的算子,只有kv格式的RDD才能调用
* 不同点:
* 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
* 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
* 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
*/
while (true){
}
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo8Union {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
//parallelize:将scala的集合变成spark中的RDD
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "张三"),
("1002", "张三2"),
("1003", "张三3"),
("1004", "张三4"),
("1005", "张三5")
))
println(s"rdd1的分区数:${rdd1.getNumPartitions}")
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1006", "李四6"),
("1007", "李四7"),
("1003", "张三3"),
("1008", "李四8"),
("1009", "李四9")
))
println(s"rdd2的分区数:${rdd2.getNumPartitions}")
val rdd3: RDD[(String, Int)] = sc.parallelize(List(
("1006", 111),
("1007", 22),
("1003", 33),
("1008", 444),
("1009", 55)
))
//两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
//分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
// rdd1.union(rdd3)
val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
resRDD1.foreach(println)
println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")
/**
* 输出:
* rdd1的分区数:1
* rdd2的分区数:1
* (1001,张三)
* (1002,张三2)
* (1003,张三3)
* (1004,张三4)
* (1005,张三5)
* (1006,李四6)
* (1007,李四7)
* (1003,张三3)
* (1008,李四8)
* (1009,李四9)
* resRDD1的分区数:2
*/
}
}
9) beitreten
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* join算子也要作用在kv格式的RDD上
*/
object Demo9Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
//parallelize:将scala的集合变成spark中的RDD
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "张三"),
("1002", "李四"),
("1003", "王五"),
("1004", "小明"),
("1005", "小红")
))
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1001", "看美女"),
("1002", "看综艺"),
("1003", "看八卦"),
("1004", "打游戏"),
("1009", "学习")
))
/**
* join 内连接
* right join 右连接
* left join 左连接
* full join 全连接
*/
// join 内连接 两个rdd共同拥有的键才会进行关联
/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
val resRDD2: RDD[(String, String, String)] = resRDD1.map {
case (id: String, (name: String, like: String)) =>
(id, name, like)
}
resRDD2.foreach(println)
//right join 右连接 保证右边rdd键的完整性
/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
val resRDD3: RDD[(String, String, String)] = resRDD2.map {
case (id: String, (Some(name), like: String)) =>
(id, name, like)
case (id: String, (None, like: String)) =>
(id, "查无此人", like)
}
resRDD3.foreach(println)
//left join: 左连接
/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
val resRDD2: RDD[(String, String, String)] = resRDD1.map {
case (id: String, (name: String, Some(like: String))) =>
(id, name, like)
case (id: String, (name: String, None)) =>
(id, name, "此人无爱好")
}
resRDD2.foreach(println)
//全连接,保证所有的键、值的完整
/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
val resRDD3: RDD[(String, String, String)] = resRDD2.map {
case (id: String, (Some(name), Some(like))) =>
(id, name, like)
case (id: String, (Some(name), None)) =>
(id, name, "此人无爱好")
case (id: String, (None, Some(like))) =>
(id, "查无此人", like)
}
resRDD3.foreach(println)
}
}