Condivisione della tecnologia

spark shuffle: gestione della riproduzione casuale

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Gestore di Shuffle

L'ingresso al sistema shuffle. ShuffleManager viene creato in sparkEnv nel driver e nell'esecutore. Registra lo shuffle nel driver e leggi e scrivi i dati nell'esecutore.

RegisterShuffle: registra lo shuffle, restituisce shuffleHandle
unregisterShuffle: rimuove la riproduzione casuale
shuffleBlockResolver: ottieni shuffleBlockResolver, utilizzato per gestire la relazione tra shuffle e blocco
getWriter: ottiene lo scrittore corrispondente alla partizione e lo chiama nell'attività di mappa dell'esecutore
getReader, getReaderForRange: ottiene il lettore di una partizione di intervallo e lo chiama nell'attività di riduzione dell'esecutore

Gestore OrdinaMescola

È l'unica implementazione di shuffleManager.
In uno shuffle basato sull'ordinamento, i messaggi in arrivo vengono ordinati in base alle partizioni e infine viene generato un file separato.
Il riduttore leggerà una regione di dati da questo file.
Quando il file di output è troppo grande per essere contenuto in memoria, sul disco verrà distribuito un file di risultati intermedio ordinato e questi file intermedi verranno uniti in un file finale per l'output.
La riproduzione casuale basata sull'ordinamento prevede due metodi:

  • Ordinamento serializzato, per utilizzare l'ordinamento serializzato devono essere soddisfatte tre condizioni:
    1. Nessuna combinazione lato mappa
    2. Supporta il riposizionamento dei valori serializzati (KryoSerializer e serializzatore personalizzato sparkSql)
    3. Minore o uguale a 16777216 partizioni
  • L'ordinamento non serializzato può essere utilizzato in tutti gli altri casi

Vantaggi dell'ordinamento serializzato
Nella modalità di ordinamento serializzato, lo scrittore casuale serializza i messaggi in arrivo, li salva in una struttura dati e li ordina.

  1. Ordinamento di dati binari anziché di oggetti Java: le operazioni di ordinamento vengono eseguite direttamente su dati binari serializzati anziché su oggetti Java, riducendo così il consumo di memoria e il sovraccarico della Garbage Collection (GC).
    Questa ottimizzazione richiede che il serializzatore di record utilizzato disponga di proprietà specifiche che consentano di riordinare i record serializzati senza prima deserializzarli.
  2. Algoritmo di ordinamento della cache efficiente: utilizzare un selezionatore efficiente della cache appositamente progettato (ShuffleExternalSorter), in grado di ordinare l'array di puntatori di record compressi e l'ID di partizione. Occupando solo 8 byte di spazio per record, questa strategia consente di inserire più dati nella cache, migliorando così le prestazioni.
  3. Il processo di fusione in overflow viene eseguito sui blocchi di record serializzati nella stessa partizione. Non è necessario deserializzare i record durante l'intero processo di fusione, evitando inutili sovraccarichi di conversione dei dati.
  4. Se il codec di compressione dell'overflow supporta la concatenazione dei dati compressi, il processo di unione dell'overflow concatena semplicemente i dati della partizione di overflow serializzati e compressi per formare la partizione di output finale. Ciò consente l'uso di metodi efficienti di copia diretta dei dati, come transferTo in NIO, ed evita la necessità di allocare buffer di decompressione o copia durante il processo di unione, migliorando l'efficienza complessiva.

RegistratiShuffle

Scegli la maniglia corrispondente in base ai diversi scenari. L'ordine di priorità è BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle

Condizione di bypass: nessun lato mappa, il numero di partizioni è inferiore o uguale a _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Condizioni di gestione della serializzazione: la classe di serializzazione supporta la migrazione di oggetti serializzati, non utilizza l'operazione mapSideCombine e il numero di partizioni dell'RDD padre non è maggiore di (1 << 24)

ottenereScrittore

Per prima cosa memorizza nella cache questo shuffle e mappa le informazioni in taskIdMapsForShuffle_
Seleziona lo scrittore corrispondente in base alla maniglia corrispondente allo shuffle.
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

annulla registrazioneMescola

taskIdMapsForShuffle rimuove lo shuffle corrispondente e i file generati dalla mappa corrispondente dello shuffle

getReader/getReaderForRange

Ottieni tutti gli indirizzi di blocco corrispondenti al file shuffle, ovvero blocchiByAddress.
Crea un oggetto BlockStoreShuffleReader e lo restituisce.

Maniglia Shuffle

Viene utilizzato principalmente per passare i parametri di shuffle ed è anche un segno per contrassegnare quale scrittore scegliere.

BaseShuffleHandle

BypassMergeSortShuffleHandle

SerializzatoShuffleHandle

Scrittore di Shuffle

Classe astratta, responsabile dei messaggi di output dell'attività della mappa. Il metodo principale è write e sono disponibili tre classi di implementazione

  • BypassMergeSortShuffleWriter
  • OrdinaScrittoreMescola
  • Scrittore non sicuro di Shuffle

Verranno analizzati separatamente in seguito.

Risolutore di blocchi Shuffle

Tratto, la classe di implementazione può ottenere i dati del blocco corrispondente in base a mapId, reduceId, shuffleId.

IndexShuffleBlockResolver

L'unica classe di implementazione di ShuffleBlockResolver.
Crea e mantieni mappature tra blocchi logici e posizioni di file fisici per i dati dei blocchi mescolati dalla stessa attività di mappa.
I dati dei blocchi casuali appartenenti alla stessa attività della mappa verranno archiviati in un file di dati consolidato.
Gli offset di questi blocchi di dati nel file di dati vengono memorizzati separatamente in un file di indice.
.data è il suffisso del file di dati
.index è il suffisso del file indice

Ottieni file di dati

Ottieni il file di dati.
Genera ShuffleDataBlockId e chiama il metodo blockManager.diskBlockManager.getFile per ottenere il file

OttieniIndiceFile

Simile a getDataFile
Genera ShuffleIndexBlockId e chiama il metodo blockManager.diskBlockManager.getFile per ottenere il file

rimuoviDataByMap

Ottieni il file di dati e il file di indice in base a shuffleId e mapId, quindi eliminali

scriviIndexFileAndCommit

Ottieni il file di dati e il file di indice corrispondenti in base a mapId e shuffleId.
Controlla se il file di dati e il file di indice esistono e corrispondono e restituiscono direttamente.
Se non può corrispondere, verrà generato un nuovo file temporaneo di indice. Quindi rinominare il file di indice e il file di dati generati e restituire.


Supponiamo che la riproduzione casuale abbia tre partizioni e che le dimensioni dei dati corrispondenti siano rispettivamente 1000, 1500 e 2500.
Nel file di indice, la prima riga è 0, seguita dal valore cumulativo dei dati della partizione. La seconda riga è 1000, la terza riga è 1000+1500=2500 e la terza riga è 2500+2500=5000.
I file di dati vengono archiviati ordinati in base alla dimensione della partizione.

controllaIndiceEFileDati

Verificare se il file di dati e il file di indice corrispondono. Se non corrispondono, viene restituito null. Se corrispondono, viene restituito un array di dimensioni della partizione.
1.La dimensione del file indice è (blocchi + 1) * 8L
2.La prima riga del file indice è 0
3. Ottieni la dimensione della partizione e scrivila in lunghezze. Il valore di riepilogo delle lunghezze è uguale alla dimensione del file di dati.
Se le tre condizioni precedenti sono soddisfatte, viene restituito lengths, altrimenti viene restituito null.

Ottieni dati di blocco

Ottieni shuffleId, mapId, startReduceId, endReduceId
Ottieni il file indice
Leggere i corrispondenti startOffset e endOffset
Utilizza il file di dati, startOffset, endOffset per generare FileSegmentManagedBuffer e restituire