Berbagi teknologi

spark shuffle —— manajemen shuffle

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Manajer Acak

Pintu masuk ke sistem shuffle. ShuffleManager dibuat di sparkEnv di driver dan eksekutor. Daftarkan shuffle di driver dan baca dan tulis data di eksekutor.

registerShuffle: daftar acak, kembalikan shuffleHandle
unregisterShuffle: menghapus shuffle
shuffleBlockResolver: Dapatkan shuffleBlockResolver, digunakan untuk menangani hubungan antara shuffle dan block
getWriter: Dapatkan penulis yang sesuai dengan partisi dan panggil dalam tugas peta pelaksana
getReader, getReaderForRange: Dapatkan pembaca partisi rentang dan panggil dalam tugas pengurangan pelaksana

Pengelola Sortiran Acak

Apakah satu-satunya implementasi shuffleManager.
Dalam pengacakan berbasis pengurutan, pesan masuk diurutkan berdasarkan partisi, dan akhirnya file terpisah dikeluarkan.
Peredam akan membaca wilayah data dari file ini.
Ketika file keluaran terlalu besar untuk muat dalam memori, file hasil antara yang diurutkan akan tumpah ke disk, dan file perantara ini akan digabungkan menjadi file akhir untuk keluaran.
Pengacakan berbasis pengurutan memiliki dua metode:

  • Pengurutan berseri, tiga kondisi harus dipenuhi untuk menggunakan pengurutan berseri:
    1. Tidak ada penggabungan sisi peta
    2. Mendukung relokasi nilai serial (KryoSerializer dan serializer khusus sparkSql)
    3. Kurang dari atau sama dengan 16777216 partisi
  • Pengurutan non-serial, pengurutan non-serial dapat digunakan dalam semua kasus lainnya

Keuntungan dari pengurutan serial
Dalam mode pengurutan berseri, penulis acak membuat serial pesan masuk, menyimpannya dalam struktur data, dan mengurutkannya.

  1. Pengurutan data biner, bukan objek Java: Operasi pengurutan dilakukan langsung pada data biner serial, bukan pada objek Java, sehingga mengurangi konsumsi memori dan mengurangi overhead pengumpulan sampah (GC).
    Pengoptimalan ini mengharuskan serializer rekaman yang digunakan memiliki properti spesifik yang memungkinkan rekaman serial disusun ulang tanpa melakukan deserialisasi terlebih dahulu.
  2. Algoritme penyortiran cache yang efisien: Gunakan penyortir efisien cache yang dirancang khusus (ShuffleExternalSorter), yang dapat mengurutkan larik penunjuk catatan terkompresi dan ID partisi. Dengan hanya menggunakan 8 byte ruang per catatan, strategi ini memungkinkan lebih banyak data masuk ke dalam cache, sehingga meningkatkan kinerja.
  3. Proses penggabungan overflow dilakukan pada blok catatan berseri di partisi yang sama. Tidak perlu melakukan deserialisasi catatan selama seluruh proses penggabungan, sehingga menghindari overhead konversi data yang tidak perlu.
  4. Jika codec kompresi overflow mendukung penggabungan data terkompresi, maka proses penggabungan overflow hanya menggabungkan data partisi overflow yang diserialisasi dan dikompresi untuk membentuk partisi keluaran akhir. Hal ini memungkinkan penggunaan metode penyalinan data langsung yang efisien, seperti transferTo di NIO, dan menghindari kebutuhan untuk mengalokasikan dekompresi atau buffer penyalinan selama proses penggabungan, sehingga meningkatkan efisiensi secara keseluruhan.

daftarAcak

Pilih pegangan yang sesuai berdasarkan skenario yang berbeda. Urutan prioritasnya adalah BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle

Kondisi bypass: tidak ada sisi peta, jumlah partisi kurang dari atau sama dengan _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Kondisi penanganan serialisasi: Kelas serialisasi mendukung migrasi objek serial, tidak menggunakan operasi mapSideCombine, dan jumlah partisi RDD induk tidak lebih besar dari (1 << 24)

dapatkanPenulis

Pertama-tama cache pengacakan ini dan petakan informasi ke taskIdMapsForShuffle_
Pilih penulis yang sesuai berdasarkan pegangan yang sesuai dengan pengacakan.
LewatiGabungkanUrutkanAcakPegangan->LewatiGabungkanUrutkanAcakPenulis
Penanganan Acak Berseri->Penulis Acak Tidak Aman
Penanganan AcakBasis->Penulis AcakUrutan

batalkan pendaftaranAcak

taskIdMapsForShuffle menghapus pengacakan yang sesuai dan file yang dihasilkan oleh pengacakan peta yang sesuai

dapatkanPembaca/dapatkanPembacaUntukRentang

Dapatkan semua alamat blok yang sesuai dengan file shuffle, yaitu blockByAddress.
Membuat objek BlockStoreShuffleReader dan mengembalikannya.

AcakTangani

Ini terutama digunakan untuk meneruskan parameter pengacakan, dan juga merupakan tanda untuk menandai penulis mana yang harus dipilih.

Pegangan Acak Dasar

LewatiGabungkanUrutkanAcakTangani

Penanganan Acak Serialisasi

Penulis Acak

Kelas abstrak, bertanggung jawab atas pesan keluaran tugas peta. Metode utamanya adalah menulis, dan ada tiga kelas implementasi

  • LewatiGabungkanUrutkanAcakPenulis
  • Penulis AcakUrutkan
  • Penulis Acak yang Tidak Aman

Nanti akan dianalisis secara terpisah.

Pengacak Blok Penyelesai

Sifatnya, kelas implementasi dapat memperoleh data blok yang sesuai berdasarkan mapId, penguranganId, shuffleId.

IndeksShuffleBlockResolver

Satu-satunya kelas implementasi ShuffleBlockResolver.
Membuat dan memelihara pemetaan antara blok logis dan lokasi file fisik untuk mengacak data blok dari tugas peta yang sama.
Data blok acak milik tugas peta yang sama akan disimpan dalam file data gabungan.
Offset blok data ini dalam file data disimpan secara terpisah dalam file indeks.
.data adalah akhiran file data
.index adalah akhiran file indeks

dapatkanDataFile

Dapatkan file datanya.
Hasilkan ShuffleDataBlockId dan panggil metode blockManager.diskBlockManager.getFile untuk mendapatkan file

dapatkanIndeksFile

Mirip dengan getDataFile
Hasilkan ShuffleIndexBlockId dan panggil metode blockManager.diskBlockManager.getFile untuk mendapatkan file

hapusDataBerdasarkanPeta

Dapatkan file data dan file indeks berdasarkan shuffleId dan mapId, lalu hapus

tulisIndeksFileDanKomit

Dapatkan file data dan file indeks yang sesuai menurut mapId dan shuffleId.
Periksa apakah file data dan file indeks ada dan cocok, dan langsung kembali.
Jika tidak cocok, file sementara indeks baru akan dibuat. Kemudian ganti nama file indeks dan file data yang dihasilkan dan kembalikan.


Asumsikan shuffle memiliki tiga partisi, dan ukuran data terkait masing-masing adalah 1000, 1500, dan 2500.
Dalam file indeks, baris pertama adalah 0, diikuti dengan nilai kumulatif data partisi. Baris kedua adalah 1000, baris ketiga adalah 1000+1500=2500, dan baris ketiga adalah 2500+2500=5000.
File data disimpan diurutkan berdasarkan ukuran partisi.

periksaIndeksDanBerkasData

Verifikasi apakah file data dan file indeks cocok. Jika tidak cocok, null dikembalikan.
1.Ukuran file indeks adalah (blok + 1) * 8L
2.Baris pertama file indeks adalah 0
3. Dapatkan ukuran partisi dan tuliskan ke dalam panjangnya. Nilai ringkasan panjangnya sama dengan ukuran file data.
Jika ketiga kondisi di atas terpenuhi, panjang dikembalikan, jika tidak maka null dikembalikan.

dapatkanBlockData

Dapatkan shuffleId, mapId, startReduceId, endReduceId
Dapatkan file indeks
Baca startOffset dan endOffset yang sesuai
Gunakan file data, startOffset, endOffset untuk menghasilkan FileSegmentManagedBuffer dan kembali