Partage de technologie

spark shuffle——gestion du mélange

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Gestionnaire de mélange

L'entrée du système de lecture aléatoire. ShuffleManager est créé dans sparkEnv dans le pilote et l'exécuteur. Enregistrez le mélange dans le pilote et lisez et écrivez les données dans l'exécuteur.

registerShuffle : enregistre le shuffle, renvoie shuffleHandle
unregisterShuffle : supprimer la lecture aléatoire
shuffleBlockResolver : récupère shuffleBlockResolver, utilisé pour gérer la relation entre shuffle et block
getWriter : Récupère le rédacteur correspondant à la partition et l'appelle dans la tâche map de l'exécuteur
getReader, getReaderForRange : récupère le lecteur d'une partition de plage et l'appelle dans la tâche de réduction de l'exécuteur

Gestionnaire de tri aléatoire

Est la seule implémentation de shuffleManager.
Dans une lecture aléatoire basée sur le tri, les messages entrants sont triés en fonction des partitions et finalement un fichier séparé est généré.
Le réducteur lira une région de données de ce fichier.
Lorsque le fichier de sortie est trop volumineux pour tenir en mémoire, un fichier de résultats intermédiaire trié sera répandu sur le disque et ces fichiers intermédiaires seront fusionnés dans un fichier final pour la sortie.
La lecture aléatoire basée sur le tri comporte deux méthodes :

  • Tri sérialisé, trois conditions doivent être remplies pour utiliser le tri sérialisé :
    1. Pas de combinaison côté carte
    2. Prend en charge la relocalisation des valeurs sérialisées (sérialiseur personnalisé KryoSerializer et sparkSql)
    3. Inférieur ou égal à 16777216 partitions
  • Tri non sérialisé, le tri non sérialisé peut être utilisé dans tous les autres cas

Avantages du tri sérialisé
En mode de tri sérialisé, le rédacteur aléatoire sérialise les messages entrants, les enregistre dans une structure de données et les trie.

  1. Tri des données binaires au lieu des objets Java : les opérations de tri sont effectuées directement sur les données binaires sérialisées plutôt que sur les objets Java, ce qui réduit la consommation de mémoire et réduit la surcharge du garbage collection (GC).
    Cette optimisation nécessite que le sérialiseur d'enregistrements utilisé possède des propriétés spécifiques permettant de réorganiser les enregistrements sérialisés sans les désérialiser au préalable.
  2. Algorithme de tri de cache efficace : utilisez un trieur spécialement conçu et efficace en cache (ShuffleExternalSorter), qui peut trier le tableau de pointeurs d'enregistrement compressé et l'ID de partition. En occupant seulement 8 octets d'espace par enregistrement, cette stratégie permet de placer davantage de données dans le cache, améliorant ainsi les performances.
  3. Le processus de fusion par débordement est effectué sur les blocs d'enregistrements sérialisés dans la même partition. Il n'est pas nécessaire de désérialiser les enregistrements pendant tout le processus de fusion, ce qui évite une surcharge inutile de conversion de données.
  4. Si le codec de compression de débordement prend en charge la concaténation des données compressées, le processus de fusion par débordement concatène simplement les données de partition de débordement sérialisées et compressées pour former la partition de sortie finale. Cela permet l'utilisation de méthodes efficaces de copie directe de données, telles que transferTo dans NIO, et évite d'avoir à allouer des tampons de décompression ou de copie pendant le processus de fusion, améliorant ainsi l'efficacité globale.

registreShuffle

Choisissez la poignée correspondante selon différents scénarios. L'ordre de priorité est BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle

Condition de contournement : pas de mapside, le nombre de partitions est inférieur ou égal à _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Conditions de gestion de sérialisation : la classe de sérialisation prend en charge la migration d'objets sérialisés, n'utilise pas l'opération mapSideCombine et le nombre de partitions du RDD parent n'est pas supérieur à (1 << 24)

obtenir un écrivain

Mettez d’abord en cache ce mélange et mappez les informations dans taskIdMapsForShuffle_
Sélectionnez l'écrivain correspondant en fonction de la poignée correspondant à la lecture aléatoire.
ContournerMergeSortShuffleHandle->ContournerMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

désinscrireShuffle

taskIdMapsForShuffle supprime le shuffle correspondant et les fichiers générés par le shuffle correspondant

getReader/getReaderForRange

Obtenez toutes les adresses de bloc correspondant au fichier aléatoire, c'est-à-dire BlocksByAddress.
Crée un objet BlockStoreShuffleReader et le renvoie.

Poignée de mélange

Il est principalement utilisé pour passer les paramètres de lecture aléatoire, et c'est aussi une marque pour marquer quel écrivain choisir.

Poignée de baseShuffle

ContournerMergeSortShuffleHandle

Poignée de shuffle sérialisé

ShuffleWriter

Classe abstraite, responsable des messages de sortie des tâches de cartographie. La méthode principale est l'écriture et il existe trois classes d'implémentation.

  • ContournerMergeSortShuffleWriter
  • TrierShuffleWriter
  • UnsafeShuffleWriter

Sera analysé séparément plus tard.

Résolveur de blocs ShuffleBlock

Trait, la classe d'implémentation peut obtenir les données de bloc correspondantes en fonction de mapId, réduireId, shuffleId.

IndexShuffleBlockResolver

La seule classe d'implémentation de ShuffleBlockResolver.
Créez et gérez des mappages entre les blocs logiques et les emplacements de fichiers physiques pour mélanger les données de bloc de la même tâche de mappage.
Les données de blocs aléatoires appartenant à la même tâche cartographique seront stockées dans un fichier de données consolidé.
Les décalages de ces blocs de données dans le fichier de données sont stockés séparément dans un fichier d'index.
.data est le suffixe du fichier de données
.index est le suffixe du fichier d'index

obtenir le fichier de données

Obtenez le fichier de données.
Générez ShuffleDataBlockId et appelez la méthode blockManager.diskBlockManager.getFile pour obtenir le fichier

obtenir le fichier index

Semblable à getDataFile
Générez ShuffleIndexBlockId et appelez la méthode blockManager.diskBlockManager.getFile pour obtenir le fichier

supprimerDataByMap

Récupérez le fichier de données et le fichier d'index en fonction de shuffleId et mapId, puis supprimez-les

écrireIndexFileAndCommit

Obtenez le fichier de données et le fichier d'index correspondants selon mapId et shuffleId.
Vérifiez si le fichier de données et le fichier d'index existent et correspondent, et revenez directement.
S'il ne peut pas correspondre, un nouveau fichier temporaire d'index sera généré. Renommez ensuite le fichier d'index et le fichier de données générés et revenez.


Supposons que la lecture aléatoire comporte trois partitions et que les tailles de données correspondantes sont respectivement de 1 000, 1 500 et 2 500.
Dans le fichier d'index, la première ligne est 0, suivie de la valeur cumulée des données de partition, la deuxième ligne est 1 000, la troisième ligne est 1 000+1 500=2 500 et la troisième ligne est 2 500+2 500=5 000.
Les fichiers de données sont stockés triés par taille de partition.

vérifierIndexAndDataFile

Vérifiez si le fichier de données et le fichier d'index correspondent. S'ils ne correspondent pas, null est renvoyé. S'ils correspondent, un tableau de taille de partition est renvoyé.
1.La taille du fichier d'index est de (blocs + 1) * 8L
2.La première ligne du fichier d'index est 0
3. Obtenez la taille de la partition et écrivez-la en longueurs. La valeur récapitulative des longueurs est égale à la taille du fichier de données.
Si les trois conditions ci-dessus sont remplies, lengths est renvoyé, sinon null est renvoyé.

obtenir les données du bloc

Obtenez shuffleId, mapId, startReduceId, endReduceId
Récupérer le fichier d'index
Lire les startOffset et endOffset correspondants
Utilisez le fichier de données, startOffset, endOffset pour générer FileSegmentManagedBuffer et renvoyer