Обмен технологиями

искровое перемешивание — управление перемешиванием

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

ShuffleManager

Вход в систему перемешивания. ShuffleManager создается в sparkEnv в драйвере и исполнителе. Зарегистрируйте shuffle в драйвере и читайте и записывайте данные в исполнителе.

RegisterShuffle: зарегистрировать перемешивание, вернуть shuffleHandle
unregisterShuffle: удалить перемешивание
shuffleBlockResolver: получите shuffleBlockResolver, используемый для обработки взаимосвязи между перемешиванием и блокировкой.
getWriter: получить средство записи, соответствующее разделу, и вызвать его в задаче карты исполнителя.
getReader, getReaderForRange: получить средство чтения раздела диапазона и вызвать его в задаче сокращения исполнителя.

SortShuffleManager

Это единственная реализация shuffleManager.
При перемешивании на основе сортировки входящие сообщения сортируются по разделам и, наконец, выводятся в отдельный файл.
Редюсер прочитает область данных из этого файла.
Если выходной файл слишком велик и не помещается в памяти, отсортированный файл промежуточных результатов будет перенесен на диск, и эти промежуточные файлы будут объединены в окончательный файл для вывода.
Перетасовка на основе сортировки имеет два метода:

  • Для использования сериализованной сортировки необходимо соблюдение трех условий:
    1. Нет объединения на стороне карты
    2. Поддерживает перемещение сериализованных значений (пользовательский сериализатор KryoSerializer и sparkSql).
    3. Меньше или равно 16777216 разделов
  • Несериализованная сортировка, несериализованная сортировка может использоваться во всех остальных случаях.

Преимущества сериализованной сортировки
В режиме сериализованной сортировки модуль записи в случайном порядке сериализует входящие сообщения, сохраняет их в структуре данных и сортирует.

  1. Сортировка двоичных данных вместо объектов Java. Операции сортировки выполняются непосредственно над сериализованными двоичными данными, а не над объектами Java, что снижает потребление памяти и снижает накладные расходы на сборку мусора (GC).
    Эта оптимизация требует, чтобы используемый сериализатор записей имел определенные свойства, позволяющие переупорядочивать сериализованные записи без предварительной их десериализации.
  2. Эффективный алгоритм сортировки кэша. Используйте специально разработанный сортировщик с эффективным использованием кэша (ShuffleExternalSorter), который может сортировать массив указателей сжатых записей и идентификатор раздела. Занимая всего 8 байтов пространства на запись, эта стратегия позволяет разместить в кэше больше данных, тем самым повышая производительность.
  3. Процесс слияния переполнения выполняется для сериализованных блоков записей в одном разделе. Нет необходимости десериализовать записи в течение всего процесса слияния, что позволяет избежать ненужных затрат на преобразование данных.
  4. Если кодек сжатия переполнения поддерживает объединение сжатых данных, то процесс объединения переполнения просто объединяет сериализованные и сжатые данные раздела переполнения для формирования окончательного выходного раздела. Это позволяет использовать эффективные методы прямого копирования данных, такие как TransferTo в NIO, и позволяет избежать необходимости выделять буферы распаковки или копирования во время процесса слияния, повышая общую эффективность.

регистрацияПеремешать

Выберите соответствующий дескриптор в соответствии с различными сценариями. Порядок приоритетов: BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle.

Условие обхода: нет карты, количество разделов меньше или равно _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_

Условия дескриптора сериализации: класс сериализации поддерживает миграцию сериализованных объектов, не использует операцию mapSideCombine, а количество разделов родительского RDD не превышает (1 << 24).

получитьWriter

Сначала кэшируйте это перемешивание и сопоставьте информацию с TaskIdMapsForShuffle_.
Выберите соответствующего писателя на основе дескриптора, соответствующего перетасовке.
ОбходMergeSortShuffleHandle->ОбходMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

отменить регистрациюПеремешать

TaskIdMapsForShuffle удаляет соответствующее перемешивание и файлы, созданные в результате перемешивания соответствующей карты.

получитьReader/получитьReaderForRange

Получите все адреса блоков, соответствующие файлу перемешивания, то естьblocksByAddress.
Создает объект BlockStoreShuffleReader и возвращает его.

ShuffleHandle

В основном он используется для передачи параметров перемешивания, а также является меткой, указывающей, какой писатель выбрать.

BaseShuffleHandle

ОбходОбъединениеСортировкаПеремешиваниеHandle

SerializedShuffleHandle

ShuffleWriter

Абстрактный класс, отвечающий за выходные сообщения задачи карты. Основной метод — write, и существует три класса реализации.

  • ОбходСлияниеСортировкаПеремешиваниеWriter
  • СортировкаПеретасовкаWriter
  • НебезопасныйShuffleWriter

Позже будет проанализировано отдельно.

ShuffleBlockResolver

Особенность: класс реализации может получить соответствующие данные блока на основе MapId, ReducId, ShuffleId.

IndexShuffleBlockResolver

Единственный класс реализации ShuffleBlockResolver.
Создавайте и обслуживайте сопоставления между логическими блоками и физическими расположениями файлов для перемешивания данных блоков из одной и той же задачи сопоставления.
Данные блока перемешивания, принадлежащие одной и той же задаче карты, будут храниться в консолидированном файле данных.
Смещения этих блоков данных в файле данных сохраняются отдельно в индексном файле.
.data — это суффикс файла данных.
.index — это суффикс индексного файла.

получитьDataFile

Получите файл данных.
Создайте ShuffleDataBlockId и вызовите метод blockManager.diskBlockManager.getFile, чтобы получить файл.

получитьИндексФайл

Похоже на: getDataFile
Создайте ShuffleIndexBlockId и вызовите метод blockManager.diskBlockManager.getFile, чтобы получить файл.

удалитьDataByMap

Получите файл данных и индексный файл на основе shuffleId и MapId, а затем удалите их.

writeIndexFileAndCommit

Получите соответствующий файл данных и файл индекса в соответствии с идентификаторами карты и shuffleId.
Проверьте, существуют ли и совпадают ли файл данных и индексный файл, и верните их напрямую.
Если он не может совпасть, будет создан новый временный файл индекса. Затем переименуйте сгенерированный индексный файл и файл данных и вернитесь.


Предположим, что shuffle имеет три раздела, а соответствующие размеры данных равны 1000, 1500 и 2500 соответственно.
В индексном файле первая строка равна 0, за ней следует совокупное значение данных раздела. Вторая строка — 1000, третья строка — 1000+1500=2500, а третья строка — 2500+2500=5000.
Файлы данных хранятся отсортированные по размеру раздела.

проверитьИндексИФайлДанные

Проверьте, совпадают ли файл данных и индексный файл. Если они не совпадают, возвращается значение null. Если они совпадают, возвращается массив размеров раздела.
1. Размер индексного файла составляет (блоки + 1) * 8L.
2. Первая строка индексного файла равна 0.
3. Получите размер раздела и запишите его в длины. Суммарное значение длин равно размеру файла данных.
Если три вышеуказанных условия соблюдены, возвращается длина, в противном случае возвращается ноль.

получитьБлокДанные

Получить shuffleId, MapId, startReduceId, endReduceId
Получить индексный файл
Прочитайте соответствующие startOffset и endOffset.
Используйте файл данных startOffset, endOffset для создания FileSegmentManagedBuffer и возврата.