Technologieaustausch

Spark Shuffle – Shuffle-Management

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

ShuffleManager

Der Eingang zum Shuffle-System. ShuffleManager wird in sparkEnv im Treiber und Executor erstellt. Registrieren Sie Shuffle im Treiber und lesen und schreiben Sie Daten im Executor.

registerShuffle: Shuffle registrieren, shuffleHandle zurückgeben
unregisterShuffle: Shuffle entfernen
shuffleBlockResolver: Ruft shuffleBlockResolver ab, mit dem die Beziehung zwischen Shuffle und Block verwaltet wird
getWriter: Rufen Sie den der Partition entsprechenden Writer ab und rufen Sie ihn in der Map-Aufgabe des Executors auf
getReader, getReaderForRange: Holen Sie sich den Reader einer Bereichspartition und rufen Sie ihn in der Reduzierungsaufgabe des Executors auf

SortShuffleManager

Ist die einzige Implementierung von shuffleManager.
Bei einem sortbasierten Shuffle werden eingehende Nachrichten nach Partitionen sortiert und schließlich in einer separaten Datei ausgegeben.
Der Reduzierer liest einen Datenbereich aus dieser Datei.
Wenn die Ausgabedatei zu groß ist, um in den Speicher zu passen, wird eine sortierte Zwischenergebnisdatei auf der Festplatte abgelegt und diese Zwischendateien werden für die Ausgabe zu einer endgültigen Datei zusammengeführt.
Sortierbasiertes Mischen verfügt über zwei Methoden:

  • Bei der serialisierten Sortierung müssen drei Bedingungen erfüllt sein, um die serialisierte Sortierung zu verwenden:
    1. Kein kartenseitiger Mähdrescher
    2. Unterstützt die Verschiebung serialisierter Werte (KryoSerializer und benutzerdefinierter SparkSql-Serializer)
    3. Kleiner oder gleich 16777216 Partitionen
  • Nicht-serialisierte Sortierung, in allen anderen Fällen kann die nicht-serialisierte Sortierung verwendet werden

Vorteile der serialisierten Sortierung
Im serialisierten Sortiermodus serialisiert der Shuffle-Writer eingehende Nachrichten, speichert sie in einer Datenstruktur und sortiert sie.

  1. Sortieren von Binärdaten anstelle von Java-Objekten: Sortiervorgänge werden direkt an serialisierten Binärdaten und nicht an Java-Objekten durchgeführt, was den Speicherverbrauch und den Garbage Collection (GC)-Overhead reduziert.
    Diese Optimierung erfordert, dass der verwendete Datensatzserialisierer über bestimmte Eigenschaften verfügt, die es ermöglichen, serialisierte Datensätze neu anzuordnen, ohne sie zuvor zu deserialisieren.
  2. Effizienter Cache-Sortieralgorithmus: Verwenden Sie einen speziell entwickelten Sortierer (ShuffleExternalSorter) mit hoher Cache-Effizienz, der das komprimierte Datensatzzeiger-Array und die Partitions-ID sortieren kann. Da diese Strategie nur 8 Byte Speicherplatz pro Datensatz beansprucht, können mehr Daten in den Cache passen, wodurch die Leistung verbessert wird.
  3. Der Überlauf-Zusammenführungsprozess wird für die serialisierten Datensatzblöcke in derselben Partition durchgeführt. Es besteht keine Notwendigkeit, die Datensätze während des gesamten Zusammenführungsprozesses zu deserialisieren, wodurch unnötiger Datenkonvertierungsaufwand vermieden wird.
  4. Wenn der Überlaufkomprimierungscodec die Verkettung komprimierter Daten unterstützt, verkettet der Überlaufzusammenführungsprozess einfach die serialisierten und komprimierten Überlaufpartitionsdaten, um die endgültige Ausgabepartition zu bilden. Dies ermöglicht die Verwendung effizienter direkter Datenkopiermethoden wie transferTo in NIO und vermeidet die Notwendigkeit, während des Zusammenführungsprozesses Dekomprimierungs- oder Kopierpuffer zuzuweisen, was die Gesamteffizienz verbessert.

registrierenShuffle

Wählen Sie den entsprechenden Griff je nach Szenario. Die Prioritätsreihenfolge ist BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle

Umgehungsbedingung: Keine Kartenseite, die Anzahl der Partitionen ist kleiner oder gleich _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Bedingungen für Serialisierungshandles: Die Serialisierungsklasse unterstützt die Migration serialisierter Objekte, verwendet nicht die MapSideCombine-Operation und die Anzahl der Partitionen des übergeordneten RDD ist nicht größer als (1 << 24).

getWriter

Cachen Sie zunächst diese Shuffle- und Map-Informationen in taskIdMapsForShuffle_
Wählen Sie den entsprechenden Autor basierend auf dem Handle aus, das dem Shuffle entspricht.
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

unregisterShuffle

taskIdMapsForShuffle entfernt die entsprechende Shuffle-Karte und die durch die Shuffle-Karte generierten Dateien

getReader/getReaderForRange

Rufen Sie alle Blockadressen ab, die der Shuffle-Datei entsprechen, also BlocksByAddress.
Erstellt ein BlockStoreShuffleReader-Objekt und gibt es zurück.

ShuffleHandle

Es wird hauptsächlich zum Übergeben der Shuffle-Parameter verwendet und ist auch eine Markierung, um zu markieren, welcher Autor ausgewählt werden soll.

BasisShuffleHandle

BypassMergeSortShuffleHandle

Serialisierter ShuffleHandle

ShuffleWriter

Abstrakte Klasse, verantwortlich für die Ausgabe von Kartenaufgaben. Die Hauptmethode ist Schreiben, und es gibt drei Implementierungsklassen

  • BypassMergeSortShuffleWriter
  • SortShuffleWriter
  • Unsicherer ShuffleWriter

Wird später separat analysiert.

ShuffleBlockResolver

Merkmal: Die Implementierungsklasse kann die entsprechenden Blockdaten basierend auf MapId, ReduceId und ShuffleId abrufen.

IndexShuffleBlockResolver

Die einzige Implementierungsklasse von ShuffleBlockResolver.
Erstellen und pflegen Sie Zuordnungen zwischen logischen Blöcken und physischen Dateispeicherorten für Shuffle-Blockdaten aus derselben Zuordnungsaufgabe.
Mischblockdaten, die zur gleichen Kartenaufgabe gehören, werden in einer konsolidierten Datendatei gespeichert.
Die Offsets dieser Datenblöcke in der Datendatei werden separat in einer Indexdatei gespeichert.
.data ist das Datendateisuffix
.index ist das Indexdateisuffix

Datendatei abrufen

Holen Sie sich die Datendatei.
Generieren Sie ShuffleDataBlockId und rufen Sie die Methode blockManager.diskBlockManager.getFile auf, um die Datei abzurufen

getIndexFile

Ähnlich wie getDataFile
Generieren Sie ShuffleIndexBlockId und rufen Sie die Methode blockManager.diskBlockManager.getFile auf, um die Datei abzurufen

Entfernen SieDataByMap

Rufen Sie die Datendatei und die Indexdatei basierend auf ShuffleId und MapId ab und löschen Sie sie dann

schreibeIndexDateiUndCommit

Rufen Sie die entsprechende Datendatei und Indexdatei gemäß MapId und ShuffleId ab.
Überprüfen Sie, ob die Datendatei und die Indexdatei vorhanden sind und übereinstimmen, und kehren Sie direkt zurück.
Wenn dies nicht der Fall ist, wird eine neue temporäre Indexdatei generiert. Benennen Sie dann die generierte Indexdatei und Datendatei um und kehren Sie zurück.


Angenommen, Shuffle hat drei Partitionen und die entsprechenden Datengrößen betragen 1000, 1500 bzw. 2500.
In der Indexdatei ist die erste Zeile 0, gefolgt vom kumulativen Wert der Partitionsdaten. Die zweite Zeile ist 1000, die dritte Zeile ist 1000+1500=2500 und die dritte Zeile ist 2500+2500=5000.
Datendateien werden nach Partitionsgröße sortiert gespeichert.

checkIndexAndDataFile

Überprüfen Sie, ob die Datendatei und die Indexdatei übereinstimmen. Wenn sie nicht übereinstimmen, wird null zurückgegeben. Wenn sie übereinstimmen, wird ein Array mit der Partitionsgröße zurückgegeben.
1. Die Größe der Indexdatei beträgt (Blöcke + 1) * 8L
2. Die erste Zeile der Indexdatei ist 0
3. Ermitteln Sie die Größe der Partition und schreiben Sie sie in Längen. Der Gesamtwert der Längen entspricht der Größe der Datendatei.
Wenn die oben genannten drei Bedingungen erfüllt sind, werden Längen zurückgegeben, andernfalls wird Null zurückgegeben.

Blockdaten abrufen

Rufen Sie ShuffleId, MapId, StartReduceId und EndReduceId ab
Holen Sie sich die Indexdatei
Lesen Sie den entsprechenden startOffset und endOffset
Verwenden Sie die Datendatei startOffset und endOffset, um FileSegmentManagedBuffer zu generieren und zurückzugeben