Technology Sharing

Spark shuffle——shuffle management

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

ShuffleManager

The entry point of the shuffle system. ShuffleManager is created in sparkEnv in driver and executor. Register shuffle in driver and read and write data in executor.

registerShuffle: register shuffle, return shuffleHandle
unregisterShuffle: remove shuffle
shuffleBlockResolver: Get shuffleBlockResolver to handle the relationship between shuffle and block
getWriter: Get the writer corresponding to the partition and call it in the executor's map task
getReader, getReaderForRange: Get the reader of a range of partitions, called in the executor's reduce task

SortShuffleManager

Is the only implementation of shuffleManager.
In a sort-based shuffle, incoming messages are sorted by partition and output to a single file.
The reducer will read a section of the region data from this file.
When the output file is too large to fit in memory, it will spill to disk to generate sorted intermediate result files, which will be merged into a final output file.
There are two ways to sort-based shuffle:

  • Serialized sort, using serialized sort requires three conditions to be met:
    1. No map-side combine
    2. Support serialized value relocation (KryoSerializer and sparkSql custom serializer)
    3. Less than or equal to 16777216 partitions
  • Non-serialized sort, non-serialized sort can be used in all other cases

Advantages of serialized sort
In serialized sort mode, the shuffle writer serializes the incoming messages, stores them in a data structure and sorts them.

  1. Sorting of binary data instead of Java objects: Sorting operations are performed directly on serialized binary data instead of Java objects, which reduces memory consumption and reduces garbage collection (GC) overhead.
    This optimization requires that the record serializer used has certain properties so that serialized records can be reordered without having to be deserialized first.
  2. Efficient cache sorting algorithm: A specially designed cache-efficient sorter (ShuffleExternalSorter) is used to sort the compressed record pointer array and partition ID. This strategy allows more data to fit into the cache, thereby improving performance by taking up only 8 bytes of space per record.
  3. The overflow merge process is performed on serialized record blocks within the same partition. There is no need to deserialize the records during the entire merge process, avoiding unnecessary data conversion overhead.
  4. If the overflow compression codec supports concatenation of compressed data, then the overflow merging process simply concatenates the serialized and compressed overflow partition data to form the final output partition. This allows the use of efficient direct data copy methods, such as transferTo in NIO, and avoids the need to allocate decompression or copy buffers during the merging process, improving overall efficiency.

registerShuffle

Select the corresponding handle according to different scenarios. The priority is BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle

Bypass condition: no mapside, the number of partitions is less than or equal to _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Serialization handle conditions: The serialization class supports the migration of serialized objects, does not use the mapSideCombine operation, and the number of partitions of the parent RDD is not greater than (1 << 24)

getWriter

First, cache the shuffle and map information in taskIdMapsForShuffle_
Select the corresponding writer according to the handle corresponding to the shuffle.
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

unregisterShuffle

taskIdMapsForShuffle removes the corresponding shuffle and the files generated by the shuffle corresponding map

getReader/getReaderForRange

Get all block addresses corresponding to the shuffle file, i.e. blocksByAddress.
Creates a BlockStoreShuffleReader object and returns it.

ShuffleHandle

It is mainly used to pass shuffle parameters, and is also a marker to select which writer

BaseShuffleHandle

BypassMergeSortShuffleHandle

SerializedShuffleHandle

ShuffleWriter

Abstract class, responsible for map task output message. The main method is write, there are three implementation classes

  • BypassMergeSortShuffleWriter
  • SortShuffleWriter
  • UnsafeShuffleWriter

This will be analyzed separately later.

ShuffleBlockResolver

Traits, the implementation class can obtain the corresponding block data according to mapId, reduceId, and shuffleId.

IndexShuffleBlockResolver

The only implementation class of ShuffleBlockResolver.
Create and maintain a mapping between logical blocks and physical file locations for shuffle block data from the same map task.
The shuffle block data belonging to the same map task will be stored in an integrated data file.
The offsets of these data blocks in the data file are stored separately in an index file.
.data is the suffix of the data file
.index is the index file suffix

getDataFile

Get the data file.
Generate ShuffleDataBlockId and call blockManager.diskBlockManager.getFile method to get file

getIndexFile

Similar to getDataFile
Generate ShuffleIndexBlockId and call blockManager.diskBlockManager.getFile method to get file

removeDataByMap

Get the data file and index file according to shuffleId and mapId, and then delete them

writeIndexFileAndCommit

Get the corresponding data file and index file according to mapId and shuffleId.
Check whether the data file and index file exist and match each other, and return directly.
If the match does not exist, a new index temporary file is generated, and then the new index file and data file are renamed and returned.


Assume that the shuffle has three partitions, and the corresponding data sizes are 1000, 1500, and 2500 respectively.
Index file, the first line is 0, followed by the cumulative values ​​of partition data, the second line is 1000, the third line is 1000+1500=2500, and the third line is 2500+2500=5000.
Data files are stored in order of partition size.

checkIndexAndDataFile

Check whether the data file and index file match. If they do not match, return null. If they match, return an array of the partition size.
1. The index file size is (blocks + 1) * 8L
2. The first line of the index file is 0
3. Get the size of the partition and write it into lengths. The summary value of lengths is equal to the size of the data file.
If the above three conditions are met, lengths is returned, otherwise null is returned

getBlockData

Get shuffleId, mapId, startReduceId, endReduceId
Get the index file
Read the corresponding startOffset and endOffset
Generate FileSegmentManagedBuffer using data file, startOffset, endOffset and return