기술나눔

스파크 셔플 - 셔플 관리

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

셔플매니저

셔플 시스템의 입구입니다. ShuffleManager는 드라이버 및 실행 프로그램의 SparkEnv에 생성됩니다. 드라이버에 셔플을 등록하고 실행기에서 데이터를 읽고 씁니다.

RegisterShuffle: 셔플 등록, shuffleHandle 반환
unregisterShuffle: 셔플 제거
shuffleBlockResolver: 셔플과 블록 간의 관계를 처리하는 데 사용되는 shuffleBlockResolver를 가져옵니다.
getWriter: 파티션에 해당하는 Writer를 가져와서 Executor의 맵 작업에서 호출합니다.
getReader, getReaderForRange: 범위 파티션의 리더를 가져와서 실행기의 축소 작업에서 호출합니다.

정렬 셔플 관리자

shuffleManager의 유일한 구현입니다.
정렬 기반 셔플에서는 수신 메시지가 파티션에 따라 정렬되고 최종적으로 별도의 파일이 출력됩니다.
감속기는 이 파일에서 데이터 영역을 읽습니다.
출력 파일이 너무 커서 메모리에 맞지 않는 경우 정렬된 중간 결과 파일이 디스크에 유출되고 이러한 중간 파일은 출력을 위해 최종 파일로 병합됩니다.
정렬 기반 셔플에는 두 가지 방법이 있습니다.

  • 직렬화 정렬, 직렬화 정렬을 사용하려면 세 가지 조건을 충족해야 합니다.
    1. 지도측 결합 없음
    2. 직렬화된 값 재배치 지원(KryoSerializer 및 SparkSql 사용자 정의 직렬 변환기)
    3. 16777216개 이하의 파티션
  • 직렬화되지 않은 정렬, 직렬화되지 않은 정렬은 다른 모든 경우에 사용될 수 있습니다.

직렬 정렬의 장점
직렬화된 정렬 모드에서 셔플 작성기는 들어오는 메시지를 직렬화하고 이를 데이터 구조에 저장한 후 정렬합니다.

  1. Java 개체 대신 이진 데이터 정렬: 정렬 작업은 Java 개체가 아닌 직렬화된 이진 데이터에서 직접 수행되므로 메모리 소비가 줄어들고 GC(가비지 수집) 오버헤드가 줄어듭니다.
    이 최적화를 위해서는 사용된 레코드 직렬 변환기에 직렬화된 레코드를 먼저 역직렬화하지 않고도 다시 정렬할 수 있는 특정 속성이 있어야 합니다.
  2. 효율적인 캐시 정렬 알고리즘: 압축된 레코드 포인터 배열 및 파티션 ID를 정렬할 수 있도록 특별히 설계된 캐시 효율적 정렬기(ShuffleExternalSorter)를 사용합니다. 레코드당 8바이트의 공간만 차지함으로써 이 전략을 사용하면 더 많은 데이터를 캐시에 넣을 수 있으므로 성능이 향상됩니다.
  3. 오버플로 병합 프로세스는 동일한 파티션의 직렬화된 레코드 블록에서 수행됩니다. 전체 병합 프로세스 중에 레코드를 역직렬화할 필요가 없으므로 불필요한 데이터 변환 오버헤드가 방지됩니다.
  4. 오버플로 압축 코덱이 압축된 데이터 연결을 지원하는 경우 오버플로 병합 프로세스는 단순히 직렬화되고 압축된 오버플로 파티션 데이터를 연결하여 최종 출력 파티션을 형성합니다. 이를 통해 NIO의 transferTo와 같은 효율적인 직접 데이터 복사 방법을 사용할 수 있으며 병합 프로세스 중에 압축 해제 또는 복사 버퍼를 할당할 필요가 없어 전반적인 효율성이 향상됩니다.

등록Shuffle

다양한 시나리오에 따라 해당 핸들을 선택하십시오. 우선순위 순서는 BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle입니다.

우회 조건: 맵사이드가 없으며 파티션 수가 _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_보다 작거나 같습니다.

직렬화 핸들 조건: 직렬화 클래스는 직렬화된 개체의 마이그레이션을 지원하고, mapSideCombine 작업을 사용하지 않으며, 상위 RDD의 파티션 수가 (1 << 24)보다 크지 않습니다.

getWriter

먼저 이 셔플을 캐시하고 정보를 taskIdMapsForShuffle_에 매핑합니다.
셔플에 해당하는 핸들을 기준으로 해당 작가를 선택합니다.
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

등록 취소Shuffle

taskIdMapsForShuffle은 해당 셔플과 셔플 해당 맵에 의해 생성된 파일을 제거합니다.

getReader/getReaderForRange

셔플 파일에 해당하는 모든 블록 주소, 즉 blockByAddress를 가져옵니다.
BlockStoreShuffleReader 객체를 생성하고 반환합니다.

셔플 핸들

주로 셔플의 매개변수를 전달하는 데 사용되며, 어떤 작가를 선택할지 표시하는 표시이기도 합니다.

베이스셔플핸들

바이패스병합정렬셔플핸들

직렬화된 셔플 핸들

셔플라이터

맵 작업 출력 메시지를 담당하는 추상 클래스입니다. 기본 메서드는 쓰기이며 세 가지 구현 클래스가 있습니다.

  • BypassMergeSortShuffleWriter
  • 정렬 셔플 작성기
  • 안전하지 않은 셔플 작성자

나중에 따로 분석하겠습니다.

셔플블록리졸버

특성상, 구현 클래스는 mapId, ReduceId, shuffleId를 기반으로 해당 블록 데이터를 얻을 수 있습니다.

IndexShuffleBlockResolver

ShuffleBlockResolver의 유일한 구현 클래스입니다.
동일한 맵 작업에서 셔플 블록 데이터에 대한 논리 블록과 실제 파일 위치 간의 매핑을 생성하고 유지합니다.
동일한 맵 작업에 속하는 셔플 블록 데이터는 통합된 데이터 파일에 저장됩니다.
데이터 파일에 있는 이러한 데이터 블록의 오프셋은 인덱스 파일에 별도로 저장됩니다.
.data는 데이터 파일 접미사입니다.
.index는 인덱스 파일 접미사입니다.

getDataFile

데이터 파일을 가져옵니다.
ShuffleDataBlockId를 생성하고 blockManager.diskBlockManager.getFile 메서드를 호출하여 파일을 가져옵니다.

getIndexFile

getDataFile 과 유사함
ShuffleIndexBlockId를 생성하고 blockManager.diskBlockManager.getFile 메서드를 호출하여 파일을 가져옵니다.

removeDataByMap

shuffleId 및 mapId를 기반으로 데이터 파일과 인덱스 파일을 가져온 후 삭제합니다.

writeIndexFileAndCommit

mapId 및 shuffleId에 따라 해당 데이터 파일과 인덱스 파일을 얻습니다.
데이터 파일과 인덱스 파일이 존재하는지 확인하고 일치하는지 확인하고 직접 반환합니다.
일치하지 않으면 새 인덱스 임시 파일이 생성됩니다. 그런 다음 생성된 인덱스 파일과 데이터 파일의 이름을 바꾸고 반환합니다.


Shuffle에 세 개의 파티션이 있고 해당 데이터 크기가 각각 1000, 1500, 2500이라고 가정합니다.
인덱스 파일에서 첫 번째 줄은 0이고, 두 번째 줄은 1000, 세 번째 줄은 1000+1500=2500, 세 번째 줄은 2500+2500=5000입니다.
데이터 파일은 파티션 크기별로 정렬되어 저장됩니다.

checkIndexAndDataFile

데이터 파일과 인덱스 파일이 일치하는지 확인합니다. 일치하지 않으면 null이 반환됩니다. 일치하면 파티션 크기의 배열이 반환됩니다.
1.인덱스 파일 크기는 (블록 + 1) * 8L입니다.
2.인덱스 파일의 첫 번째 줄은 0입니다.
3. 파티션의 크기를 가져와서 길이로 기록합니다. 길이의 요약 값은 데이터 파일의 크기와 같습니다.
위의 세 가지 조건이 충족되면 길이가 반환되고, 그렇지 않으면 null이 반환됩니다.

블록데이터 가져오기

shuffleId, mapId, startReduceId, endReduceId 가져오기
인덱스 파일 가져오기
해당 startOffset 및 endOffset을 읽습니다.
데이터 파일 startOffset, endOffset을 사용하여 FileSegmentManagedBuffer를 생성하고 반환