2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
(1) Koko kipinätyö käynnistetään suoritettavaksi vain, kun toimintooperaattori kohdataan.
(2) Tapattu useita kertoja, teloitettu useita kertoja
Joustavuus: datamäärä voi olla suuri tai pieni
RDD on samanlainen kuin kontti, mutta se ei tallenna tietoja, vaan laskentalogiikkaa.
Kun toimintooperaattori kohdataan, koko kipinätyö käynnistetään suoritettavaksi ensimmäisestä RDD:stä alkaen.
Data on vain virtaava suhde RDD:iden välillä, eikä sitä tallenneta.
Virtaavan datan määrä voi olla suuri tai pieni, joten sitä kutsutaan joustavuudeksi
hajautettu:
Spark tarvitsee olennaisesti datan lukemista HDFS:stä ja datalohkot voivat olla tulevaisuudessa eri datasolmuissa.
RDD:ssä virtaava data voi tulla eri datasolmujen lohkodatasta.
tietojoukko:
Laskentaprosessin aikana RDD voidaan pitää hetken konttina. Säiliössä on dataa, jota ei ole oletuksena tallennettu muistiin.
RDD:n tiedot voidaan tallentaa levylle myöhemmin.
1) MinPartitions-parametri tiedostoa luettaessa voi määrittää vain osioiden vähimmäismäärän. RDD-osioiden todellinen lukumäärä tiedoston lukemisen jälkeen määräytyy itse tietosisällön ja klusterin jakautumisen perusteella.
2) Jos asetettujen minPartitions-määrä on pienempi kuin lohkojen lukumäärä, osioiden lukumäärä määräytyy itse asiassa lohkojen lukumäärän mukaan.
3) Kun soitat satunnaistoiston luovalle operaattorille, voit syöttää numeroPartitionsin (esimerkiksi: groupby()), mikä voi itse asiassa muuttaa RDD:n osioiden määrää Se, kuinka monta osiota määrität, määrittää kuinka monta osiota lopullinen RDD tekee omistaa.
4) Tiedosto tallennetaan HDFS:ään lohkojen muodossa. Jos tiedosto ei saavuta oletusarvoa 128M, se tallennetaan myös lohkoon.
Aluksi RDD:n osioiden lukumäärä määräytyy dataa lukevien lohkojen lukumäärän mukaan.
Jälkimmäisen RDD:n osiodata KV-toimintoa lukuun ottamatta vastaa edellisen RDD:n osiotietojen loogisen käsittelyn tulosta. Oletusarvoisesti, jos myöhempiä osioita ei käsitellä, kaikkien seuraavien RDD:iden osioiden määrä riippuu ensimmäisestä RDD:stä.
Lopullisessa RDD:ssä on useita osioita, ja näet useita tulostiedostoja HDFS:ssä tulevaisuudessa (HDFS -> RDD -> HDFS)
1) Luottaa suppeasti yksi-yhteen-suhteeseen, jossa tiedot tietystä osiosta edellisessä RDD:ssä menevät vain yksilölliseen osioon seuraavassa RDD:ssä (tai useat aikaisemmat osiot voivat siirtyä seuraavaan osioon)
2) Yksi-moneen-suhde, joka on laajasti riippuvainen tietystä osiosta edellisessä RDD:ssä, siirtyy eri osioihin myöhemmässä RDD:ssä. Voit myös arvioida, syntyykö sekoitus.
3) Koko kipinätyö jaetaan useisiin vaiheisiin laajojen riippuvuuksien lukumäärällä, Num(vaihe) = Num(laajat riippuvuudet) + 1
4) Kun kohtaat operaattorin, joka tuottaa sekoituksen, se sisältääIlmiö, jossa tietoja kirjoitetaan edelliseltä RDD:ltä levylle ja luetaan tietoja levyltä seuraavalle RDD:lle.
Huomautus: Kun suoritus käynnistetään ensimmäisen kerran, levyllä ei ole tietoja, joten suoritus alkaa ensimmäisestä luodusta RDD:stä.
Kun sama suoritus käynnistetään toistuvasti, samalle DAG-suunnaiselle asykliselle graafille, suoritus alkaa suoraan RDD:stä sekoituksen jälkeen (jättäen pois edellisen RDD:n tietojen kirjoitusprosessin levylle), ja se voidaan lukea suoraan levyltä. .
5) **Yhdessä vaiheessa,RDD:llä onUseilla osioilla on useita rinnakkaisia tehtäviä.
Yksi Spark-suunnittelun periaatteista ontietojen lokalisointi(Data Locality), eli yritä saada laskentatehtävät suoritetuksi solmussa, jossa data sijaitsee, mikä vähentää verkon tiedonsiirtoa.
object WordCount2 {
def main(args: Array[String]): Unit = {
//创建spark配置文件对象
val conf: SparkConf = new SparkConf()
//设置运行模式
//如果是本地local模式运行的话,需要设置setMaster
//将来如果是集群进行,将这句话注释即可
conf.setMaster("local")
//设置spark作业的名字
conf.setAppName("wordcount")
//创建spark core上下文环境对象
val sc: SparkContext = new SparkContext(conf)
//===================================================================================
//读取文件,每次读取一行
//RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
// println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")
//一行数据根据分隔符分割
val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\|"))
// println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")
//将每一个单词组成(word,1)
val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")
//根据键进行分组,并设置分区数为 5
val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")
val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
println(s"resRDD的分区数:${resRDD.getNumPartitions}")
//打印
resRDD2.foreach(println)
//指定的是所要写入数据的文件夹的路径
//spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
resRDD.saveAsTextFile("spark/data/outdata1")
}
}
Operaattorit on jaettu kahteen luokkaan:
1. Muunnosoperaattori (RDD -> RDD, käsittelylogiikka)
2. Toimintooperaattori (käynnistää työn suorittamisen)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo1Map {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
//将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
val rdd2: RDD[(String,String,String,String,String)] = lineRDD.map((line: String) => {
println("==============处理后的数据========================")
val array1: Array[String] = line.split(",")
(array1(0),array1(1),array1(2),array1(3),array1(4))
})
//foreach是一个行动算子,遇到行动算子,触发作业执行
/**
* 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
* 只有当需要计算一个结果时(即调用行动算子时),才会执行。
* 打印结果:
* ==============处理后的数据========================
* (1500100001,施笑槐,22,女,文科六班)
* ==============处理后的数据========================
* (1500100002,吕金鹏,24,男,文科六班)
*每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
*/
rdd2.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo2Filter {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//需求:过滤出所有的男生
//filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
// 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,
// 但是判断中的println()属于scala,并不受影响
val genderRDD: RDD[String] = lineRDD.filter((line: String) => {
var b: Boolean = false
if ("女".equals(line.split(",")(3))) {
println("============这是女生==================")
} else {
println("============这是男生==================")
b = "男".equals(line.split(",")(3))
}
b
})
genderRDD.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo3FlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/wcs/words.txt")
/**
* flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
* 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
* 打印结果:
* ===============一条数据====================
* hello
* world
* ===============一条数据====================
* java
* hadoop
* linux
*/
val rdd1: RDD[String] = lineRDD.flatMap((line:String)=>{
println("===============一条数据====================")
line.split("\|")
})
rdd1.foreach(println)
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo4Sample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc: SparkContext = new SparkContext(conf)
//===============================================================
val lineRDD: RDD[String] = sc.textFile("spark/data/students.txt")
/**
* sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
* 这个函数主要在机器学习的时候会用到
* withReplacement :
* 为True时,抽样结果中可能会包含重复的元素。
* 为False时,抽样结果中不会包含重复的元素。
* fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
*/
val rdd1: RDD[String] = lineRDD.sample(withReplacement = false, fraction = 0.1)
rdd1.foreach(println)
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo5GroupBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupBy")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
/**
* groupBy算子的使用
*
* 1、groupBy的算子,后面的分组条件是我们自己指定的
* 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/
// val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
// groupRDD.foreach(println)
val resKvRDD: RDD[(String, Double)] = groupRDD.map((kv: (String, Iterable[(String, Int)])) => {
val clazz: String = kv._1
val avgAge: Double = kv._2.map(_._2).sum.toDouble / kv._2.size
(clazz, avgAge)
})
resKvRDD.foreach(println)
// while (true){
//
// }
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo6GroupByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("groupByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(_, _, age: String, _, clazz: String) =>
(clazz, age.toInt)
}
/**
* GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
* 也就说,只有kv格式的RDD才能调用kv格式的算子
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/
val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
val resKvRDD2: RDD[(String, Double)] = groupByKeyRDD.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum.toDouble / kv._2.size))
resKvRDD2.foreach(println)
/**
* 面试题:spark core中 groupBy算子与groupByKey算子的区别?
* 1、代码格式上:
* groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
* groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
*
* 2、执行shuffle数据量来看
* groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
* 所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
*/
while (true) {
}
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo7ReduceByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")
//求每个班级的平均年龄
val arrayRDD: RDD[Array[String]] = linesRDD.map((line: String) => line.split(","))
//分别使用groupByKey和reduceBykey计算每个学生的总分
val idWithScoreRDD: RDD[(String, Int)] = arrayRDD.map {
case Array(id: String, _, score: String) =>
(id, score.toInt)
}
/**
* groupByKey实现
*/
// val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()
// val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))
// resRDD1.foreach(println)
/**
* reduceByKey实现
* 输出:
* (1500100113,519)
* (1500100724,440)
* (1500100369,376)
* (1500100378,402)
* (1500100306,505)
* (1500100578,397)
*/
val resRDD2: RDD[(String, Int)] = idWithScoreRDD.reduceByKey((v1: Int, v2: Int) => v1 + v2)
resRDD2.foreach(println)
/**
* 面试题:
* groupByKey与reduceBykey的区别?
* 相同点:
* 它们都是kv格式的算子,只有kv格式的RDD才能调用
* 不同点:
* 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
* 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
* 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
*/
while (true){
}
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo8Union {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
//parallelize:将scala的集合变成spark中的RDD
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "张三"),
("1002", "张三2"),
("1003", "张三3"),
("1004", "张三4"),
("1005", "张三5")
))
println(s"rdd1的分区数:${rdd1.getNumPartitions}")
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1006", "李四6"),
("1007", "李四7"),
("1003", "张三3"),
("1008", "李四8"),
("1009", "李四9")
))
println(s"rdd2的分区数:${rdd2.getNumPartitions}")
val rdd3: RDD[(String, Int)] = sc.parallelize(List(
("1006", 111),
("1007", 22),
("1003", 33),
("1008", 444),
("1009", 55)
))
//两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的
//分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定
// rdd1.union(rdd3)
val resRDD1: RDD[(String, String)] = rdd1.union(rdd2)
resRDD1.foreach(println)
println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")
/**
* 输出:
* rdd1的分区数:1
* rdd2的分区数:1
* (1001,张三)
* (1002,张三2)
* (1003,张三3)
* (1004,张三4)
* (1005,张三5)
* (1006,李四6)
* (1007,李四7)
* (1003,张三3)
* (1008,李四8)
* (1009,李四9)
* resRDD1的分区数:2
*/
}
}
9) liity
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* join算子也要作用在kv格式的RDD上
*/
object Demo9Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("reduceByKey")
val sc: SparkContext = new SparkContext(conf)
//===================================================
//parallelize:将scala的集合变成spark中的RDD
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("1001", "张三"),
("1002", "李四"),
("1003", "王五"),
("1004", "小明"),
("1005", "小红")
))
val rdd2: RDD[(String, String)] = sc.parallelize(List(
("1001", "看美女"),
("1002", "看综艺"),
("1003", "看八卦"),
("1004", "打游戏"),
("1009", "学习")
))
/**
* join 内连接
* right join 右连接
* left join 左连接
* full join 全连接
*/
// join 内连接 两个rdd共同拥有的键才会进行关联
/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1: RDD[(String, (String, String))] = rdd1.join(rdd2)
val resRDD2: RDD[(String, String, String)] = resRDD1.map {
case (id: String, (name: String, like: String)) =>
(id, name, like)
}
resRDD2.foreach(println)
//right join 右连接 保证右边rdd键的完整性
/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2: RDD[(String, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
val resRDD3: RDD[(String, String, String)] = resRDD2.map {
case (id: String, (Some(name), like: String)) =>
(id, name, like)
case (id: String, (None, like: String)) =>
(id, "查无此人", like)
}
resRDD3.foreach(println)
//left join: 左连接
/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
val resRDD2: RDD[(String, String, String)] = resRDD1.map {
case (id: String, (name: String, Some(like: String))) =>
(id, name, like)
case (id: String, (name: String, None)) =>
(id, name, "此人无爱好")
}
resRDD2.foreach(println)
//全连接,保证所有的键、值的完整
/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
val resRDD3: RDD[(String, String, String)] = resRDD2.map {
case (id: String, (Some(name), Some(like))) =>
(id, name, like)
case (id: String, (Some(name), None)) =>
(id, name, "此人无爱好")
case (id: String, (None, Some(like))) =>
(id, "查无此人", like)
}
resRDD3.foreach(println)
}
}