技術共有

スパークシャッフル——シャッフル管理

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

シャッフルマネージャー

シャッフルシステムへの入り口。 ShuffleManagerはドライバーとエグゼキューターのsparkEnv内に作成されます。ドライバにシャッフルを登録し、エグゼキュータでデータの読み書きを行います。

registerShuffle: シャッフルを登録し、shuffleHandle を返します。
unregisterShuffle: シャッフルを削除します
shuffleBlockResolver: シャッフルとブロックの間の関係を処理するために使用される shuffleBlockResolver を取得します。
getWriter: パーティションに対応するライターを取得し、エグゼキューターのマップタスクで呼び出します。
getReader、getReaderForRange: 範囲パーティションのリーダーを取得し、エグゼキューターの Reduce タスクで呼び出します。

ソートシャッフルマネージャー

shuffleManager の唯一の実装です。
ソートベースのシャッフルでは、受信メッセージがパーティションに従ってソートされ、最終的に別のファイルが出力されます。
リデューサーは、このファイルからデータの領域を読み取ります。
出力ファイルが大きすぎてメモリに収まらない場合、ソートされた中間結果ファイルがディスク上にあふれ、これらの中間ファイルは出力用の最終ファイルにマージされます。
ソートベースのシャッフルには 2 つの方法があります。

  • シリアル化された並べ替え。シリアル化された並べ替えを使用するには、次の 3 つの条件を満たす必要があります。
    1. マップ側コンバインなし
    2. シリアル化された値の再配​​置をサポート (KryoSerializer および sparSql カスタム シリアライザー)
    3. 16777216 以下のパーティション
  • 非シリアル化ソート、非シリアル化ソートは他のすべての場合に使用できます

シリアル化ソートの利点
シリアル化並べ替えモードでは、シャッフル ライターは受信メッセージをシリアル化してデータ構造に保存し、並べ替えます。

  1. Java オブジェクトではなくバイナリ データの並べ替え: 並べ替え操作は Java オブジェクトではなくシリアル化されたバイナリ データに対して直接実行されるため、メモリ消費が削減され、ガベージ コレクション (GC) のオーバーヘッドが削減されます。
    この最適化では、使用するレコード シリアライザーに、シリアル化されたレコードを最初に逆シリアル化せずに並べ替えることができる特定のプロパティが必要です。
  2. 効率的なキャッシュ ソート アルゴリズム: 圧縮されたレコード ポインター配列とパーティション ID をソートできる、キャッシュ効率の高い特別に設計されたソーター (ShuffleExternalSorter) を使用します。この戦略では、レコードごとに 8 バイトのスペースのみを使用するため、より多くのデータをキャッシュに格納できるようになり、パフォーマンスが向上します。
  3. オーバーフロー マージ プロセスは、同じパーティション内のシリアル化されたレコード ブロックに対して実行されます。マージ プロセス全体でレコードを逆シリアル化する必要がないため、不必要なデータ変換のオーバーヘッドが回避されます。
  4. オーバーフロー圧縮コーデックが圧縮データの連結をサポートしている場合、オーバーフロー マージ プロセスは、シリアル化および圧縮されたオーバーフロー パーティション データを単純に連結して、最終的な出力パーティションを形成します。これにより、NIO の transferTo などの効率的な直接データ コピー メソッドの使用が可能になり、マージ プロセス中に解凍やコピー バッファーを割り当てる必要がなくなり、全体的な効率が向上します。

登録シャッフル

さまざまなシナリオに応じて、対応するハンドルを選択してください。優先順位は BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle です。

バイパス条件: マップサイドなし、パーティションの数が _SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_ 以下である

シリアル化ハンドルの条件: シリアル化クラスはシリアル化オブジェクトの移行をサポートし、mapSideCombine 操作を使用せず、親 RDD のパーティション数は (1 << 24) 以下です。

取得ライター

まずこのシャッフルをキャッシュし、情報を taskIdMapsForShuffle_ にマップします
シャッフルに対応するハンドルに基づいて、対応するライターを選択します。
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
シリアル化されたShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter

登録解除シャッフル

taskIdMapsForShuffle は、対応するシャッフルと、シャッフル対応マップによって生成されたファイルを削除します

getReader/getReaderForRange

シャッフル ファイルに対応するすべてのブロック アドレス (blocksByAddress) を取得します。
BlockStoreShuffleReader オブジェクトを作成して返します。

シャッフルハンドル

主にシャッフルのパラメータを渡すために使用され、どのライターを選択するかを示すマークでもあります。

ベースシャッフルハンドル

バイパスマージソートシャッフルハンドル

シリアル化されたシャッフルハンドル

シャッフルライター

マップ タスクの出力メッセージを担当する抽象クラス。主なメソッドは write で、3 つの実装クラスがあります。

  • バイパスマージソートシャッフルライター
  • ソートシャッフルライター
  • 安全でないShuffleWriter

後ほど別途分析させていただきます。

シャッフルブロックリゾルバ

特性、実装クラスは、mapId、reduceId、shuffleId に基づいて対応するブロック データを取得できます。

インデックスシャッフルブロックリゾルバー

ShuffleBlockResolver の唯一の実装クラス。
同じマップ タスクからのシャッフル ブロック データの論理ブロックと物理ファイルの場所の間のマッピングを作成および維持します。
同じマップタスクに属するシャッフルブロックデータは、統合データファイルに保存されます。
データ ファイル内のこれらのデータ ブロックのオフセットは、インデックス ファイルに個別に保存されます。
.data はデータファイルの接尾辞です
.index はインデックス ファイルの接尾辞です。

データファイルを取得する

データファイルを取得します。
ShuffleDataBlockId を生成し、blockManager.diskBlockManager.getFile メソッドを呼び出してファイルを取得します

インデックスファイルを取得する

getDataFileに似ている
ShuffleIndexBlockId を生成し、blockManager.diskBlockManager.getFile メソッドを呼び出してファイルを取得します

マップによるデータの削除

shuffleIdとmapIdに基づいてデータファイルとインデックスファイルを取得し、削除します

書き込みインデックスファイルとコミット

mapIdとshuffleIdに従って、対応するデータファイルとインデックスファイルを取得します。
データファイルとインデックスファイルが存在し、一致するかどうかを確認し、直接リターンします。
一致しない場合は、新しいインデックス一時ファイルが生成されます。次に、生成されたインデックス ファイルとデータ ファイルの名前を変更して戻ります。


シャッフルに 3 つのパーティションがあり、対応するデータ サイズがそれぞれ 1000、1500、2500 であるとします。
インデックスファイルの1行目は0、2行目は1000、3行目は1000+1500=2500、3行目は2500+2500=5000となります。
データ ファイルはパーティション サイズごとにソートされて保存されます。

チェックインデックスとデータファイル

データファイルとインデックスファイルが一致するかどうかを確認し、一致しない場合は null を返し、一致する場合はパーティション サイズの配列を返します。
1.インデックスファイルのサイズは(ブロック+1)*8Lです。
2.インデックスファイルの1行目は0です
3. パーティションのサイズを取得し、長さに書き込みます。長さの合計値はデータ ファイルのサイズと等しくなります。
上記 3 つの条件が満たされる場合は長さが返され、そうでない場合は null が返されます。

ブロックデータを取得する

shuffleId、mapId、startReduceId、endReduceId を取得します
インデックスファイルを取得する
対応する startOffset と endOffset を読み取ります。
データ ファイル、startOffset、endOffset を使用して FileSegmentManagedBuffer を生成し、返します