KATOエンジニヤリング開発日誌

「アウトプット無きエンジニアにインプットもチャンスも無い」の精神で書いています

「詳解 Apache Spark」の第3章を読んだ

「詳解 Apache Spark」の第3章を読んだまとめです。

www.kato-eng.info

RDDの基本と構造

RDDの性質

RDD(Resilient Distributed Dataset)はイミュータブルなデータ要素の分散コレクションである。SparkではこのRDDに対するデータ処理を行うことで分散処理を実現している。
RDDはパーティションと呼ばれる単位に分割されており、Sparkクラスの複数のノードに分散して配置される。Sparkではパーティション単位で処理が行われるため、大量のデータでも複数のノードで分散して処理できる。

f:id:masayuki_kato:20171006135134j:plain

上記の図は3つのワーカにRDDを4つのパーティションに分割して、分散配置していることを示している。

イミュータブルな性質とは、一度作成したオブジェクトはその後、状態を変えることができないということである。一度RDDを作成したら、そのRDDに対しては値の変更を行うことができない。
RDDは繰り返し利用することを前提に設計されているため、同一のRDDを複数の処理から同時に利用することがある。もしRDDの値が変更できてしまうと、下記のような不都合が生じる。

  • 処理の順番によって結果が変わることがある
  • Spark内部でデータのロック処理が必要になる
  • 値が変更されるたびにキャッシュを複数ノード間で同期する必要がある

SparkではRDDから新しいRDDの作成を繰り返していくことでデータ処理を実行する。

f:id:masayuki_kato:20171006140459j:plain

このようにSparkは処理の実行計画をRDDの変換による有向グラフとして持っている。全てのRDDの変換は遅延評価されて実行するので、変換操作が呼ばれた時点ではデータの処理が行われず、「RDDに対する操作」として記録のみ行う。

大規模なデータを処理するSparkでは変換操作が遅延評価されることで、不要なデータをメモリ上に保持したり、不要な処理を回避し本当に必要な処理にリソースを集中させることができるため、パフォーマンスの観点から重要な要素となっている。

但し、遅延評価を使うことで効率的な処理を実現できるとは限らない。一つのRDDから2種類の変換処理を用いて2つのRDDを生成するケースの場合、下図のような処理とはならない。

f:id:masayuki_kato:20171006144728j:plain

入力データを読み込んだRDDを生成し、異なる変換処理を行い別のRDDを生成している。だがこの場合は下図のように入力データからRDDを生成する操作が2回発生してしまう。

f:id:masayuki_kato:20171006212355j:plain

これはRDDを永続化することで解決できる。永続化することで同一データに対して繰り返し操作・計算を行う場合に効率的に処理できるようになる。永続先はメモリだけではなくディスクも選択できるので、メモリに収まりきらない量のデータも永続化できる。

f:id:masayuki_kato:20171006212814j:plain

変換とアクション

RDDの操作には変換(Transformation)とアクション(Action)の2種類あり、基本的にはこの2つを組み合わせて複雑な分散処理を実現させる。
変換とは既存のRDDに処理を加えて新しいRDDを作成する操作で、一般的なプログラミング言語におけるコレクションへの操作のようなものである。
アクションとは各ワーカでの処理をまとめて、ScalaでのArrayなどの通常のコレクションに変換したり、HDFSやS3などのストーレジに値を保存したりする操作である。

RDDはパーティション単位で分割されているため、フィルタリングといった操作はワーカ内の処理のみで動作するが、WordCountのような異なるパーティションをまたぐデータの集約が必要な処理の場合、Sparkではシャッフル(Shuffle)という機能により、異なるパーティションに含まれる同一キーの要素を同一パーティションにまとめる処理を行う。

f:id:masayuki_kato:20171006142606j:plain

シャッフルに関してはSpark側が全面的に引き受けて動作してくれるが、Sparkの特性上、シャッフルではノード間でネットワーク通信が発生するためアプリケーションのボトルネックとなりがちである。
この問題を避けるにはすべてのデータをシャッフルしたあとでフィルタリングをするのではなく、フィルタリングを行った後のデータをシャッフルするなどの、不要なシャッフルが発生しないように配慮が必要である。

パーティション

パーティション数の調整

Sparkではパーティション単位で分散して処理が行われるため、パーティション数は処理時間に大きく影響する。

  • パーティション数が少ない場合
    • 十分に分散できない
    • シャッフル時に一部のワーカに処理が偏る
    • メモリ使用量が増加し、処理が失敗してしまう
  • パーティション数が多すぎる場合
    • パーティション毎の変換・アクションのオーバーヘッドが大きくなってしまう

パーティション数は下記の数に収まるように調整することが推奨されている。

  • 大まかな目安
    • 100から10000パーティション
  • 下限値
    • CPUコア数の2倍
  • 上限値
    • 1タスクあたり最低100msを保証

パーティショナ

ペアRDDに対しシャッフルを伴う変換を実行すると、キー毎に同一のパーティションにまとめられ要素がグルーピングされる。これをRDDのパーティショニングと呼ぶ。そしてキーのグルーピング方法を決定するのがパーティショナである。

Sparkでは標準で下記2つのパーティショナが実装されている。

  • HashPartitioner
    • キーのハッシュ値にもとづいてパーティションを決定する
    • SortByKeyメソッド以外の大部分のメソッドで利用される
  • RangePartitioner
    • キーの範囲によってパーティションを決定する
    • 主にSortByKeyメソッドで利用される

RDDのシャッフルは複数ノード間でのネットワーク通信が発生するので、パフォーマンス向上のためにはこのネットワーク通信を最小限に抑えることが重要になってくる。joinメソッドでRDDの結合を行う場合では、両方のRDDが同じパーティショナでパーティショニングされていると同じキーの要素は同一ノード上に存在することになるのでシャッフルは行われない。