HiveとImpalaにワークフロースケジューラーを入れてみた(後編)

こんにちは、CyberZのエンジニアの遠藤です。

もうすっかり年末になってしまいました。1年経つのは早いですね。 そういえばCDH5.5でImpala 2.3がリリースされました。待望(ようやく)のnested type対応です。 先日、CDHのバージョン上げたばかりなのですが、これは早くCDH5.5にしないと。。

さて、ちょっと時間が空いてしまったのですが、「Patriot-workflow-scheduler」(以下Patriot)導入の後編を書きたいと思います。

前編では、簡単な選定理由と導入状況を書きました。 後編ではもう少し詳しい選定理由(Patriotの良い所)と現在のシステム構成と冗長構成について書きたいと思います。

選定理由

選定理由としては以下の4点でした。

  1. 依存関係が張りやすい

  2. 信頼性が高い

  3. その気になれば柔軟に機能拡張を行える

  4. Amebaに所属していた人間が多く、Patriotを利用した集計バッチの追加や運用を行っていたため学習コストが低かった

それぞれ具体的に見ていくと

依存関係が張りやすい

PatriotではGUIで依存関係を設定する機能はありません。ソースコード上で依存関係の設定が必要となります。

以下がサンプルのコードになります。これらは親ジョブが正常終了後に子ジョブが2つが動作する設定となります。 具体的には、親ジョブのソースコード上のproduceという行で自らのproduce名(親であること)を宣言しています。 子ジョブではrequireの行にて親ジョブのproduce名を設定します。 この設定では親ジョブ("hive_repair_table#{date}")が完了後に、子ジョブグループ("hive_convert_for_impala#{date}")の"count_action_daily#{date_}"が実行されます。 produceやrequireの設定はグループ単位でも指定でき、子ジョブのように複数のジョブに依存関係を貼ることもできます。

なお、retrialの行はリトライは3回まで。60秒後にリトライする設定を意味します。 mail_notificationはリトライがすべて失敗した際に送信するメールの宛先です。

// 親ジョブ
produce ["hive_repair_table_#{_date_}"]
  hive_repair_table{
    retrial 'count' => 3, 'interval' => 60
    mail_notification 'to' => 'alert@ML', 'on' => ['failed']
    date "#{_date_}"
    table_name "action_data_table"
  }
// 子ジョブ
job_group{
  require ["hive_repair_table_#{_date_}"]
  produce ["hive_convert_for_impala_#{_date_}"]

  hive_do_query{
  retrial 'count' => 3, 'interval' => 60
  mail_notification 'to' => 'alert@ML', 'on' => ['failed']
  name "count_action_daily_#{_date_}"
  hql "INSERT OVERWRITE TABLE count_action_daily
    PARTITION(pt_date='#{_date_}')
    SELECT count(1)
    FROM action_data_table
    WHERE pt_date = '#{_date_}';"
  }

  hive_do_query{
  retrial 'count' => 3, 'interval' => 60
  mail_notification 'to' => 'alert@ML', 'on' => ['failed']
  name "uu_action_daily#{_date_}"
  hql "INSERT OVERWRITE TABLE count_action_daily
    PARTITION(pt_date='#{_date_}')
    SELECT count(distinct id)
    FROM action_data_table
    WHERE pt_date = '#{_date_}';"
  }

信頼性が高い

これについてはAmeba側での運用実績をもとにしています。実際にPatriot自体の障害やトラブルは運用上ほとんど発生していません。 24時間365日の動作が必要なスケジューラーとしての信頼性としては非常に高いと考えています。

その気になれば柔軟に機能拡張を行える

PatriotではRubyを利用してPluginを自由に作ることができます。 例えば、依存関係の説明の親ジョブのコードで、hive_repair_tableと設定しています。 こちらはPluginで実装を設定しており、テーブル名を渡すと内部的には以下のコードが実行されています。

MSCK REPAIR TABLE #{@table_name}

この仕組を利用することで、共通的な処理は引数を渡すだけで誰でも利用することができます。 例えば、hiveのUDFのimport処理はplugin側に設定しておけば、開発者はimport文を書くこと無くクエリを書くだけで済みます。 ちなみに、CyberZではImpala、sqoop、プロダクト毎に必要なMySQLの処理やシェルの処理についてPluginを作成し運用しています。

Amebaに所属していた人間が多く、Patriotを利用した集計バッチの追加や運用を行っていたため学習コストが低かった

理由としてはこれが一番大きいかもしれません。 CyberZにはAmebaに在籍していたエンジニアが複数います。AmebaではPatriotを定常的に利用していたため、CyberZでの導入の際に学習コストが低く、導入の敷居が非常に低かったです。

実はAzkabanやAirflow、Oozieについても並行して調査は行っていました。しかし、Patriotの信頼性が高さや環境や構成に合わせた拡張が可能な点、そして学習コストの低さという点から、最終的な採用につながりました。

システム構成と冗長化

次に現在のシステム構成と冗長化について説明します。

前編では、workerが2台存在しており、内部の分析用のHadoop環境でのみ利用していました。 現在ではworkerが6台構成となり、分析用のHadoop以外でも利用が進んでいます。 処理としては、1時間毎のバッチ処理、デイリーのバッチ処理、週次のバッチ処理、外部APIコールをPatriotから実行しています。

Patriotにはマスターノードが無く、各workerがMySQLから実行可能なジョブ取得することで動作します。 そのため冗長化する場合は、workerは複数台あれば冗長化されます。仮に1台workerが停止しても残りのworkerが動作していればジョブの実行自体には問題ありません。 MySQLについてはマスター・スレーブ構成としています。

具体的には、

  1. 内部分析のバッチ実行用(Hive集計、Impala集計など)

  2. 画面側集計のバッチ実行用(Hive集計、MySQL集計、サーバー間のデータの転送処理など)

  3. あるPJの1時間毎のバッチ実行用(外部APIコール、AWSのバッチ実行など)

でそれぞれ2台づつ構築しています。

f:id:cyberz-dev-writer:20151222155112p:plain

なお、Patriotのジョブの追加や修正があった場合、JenkinsにてGitHubからソースを取得しデプロイを行っています。 GUIが無いデメリットもありますが、ジョブがすべてソースとして管理できるというメリットもあります。

まとめ

全2回でご紹介したCyberZでのワークフロースケジューラー導入の記事を書きました。 CyberZでも当初はJenkinsを利用してHiveやImpalaのバッチ集計の制御を行っていました。Jenkinsをcron的に利用したり、依存関係を張るPluginを利用することでジョブの制御は可能です。 しかし、集計ジョブの増加や依存関係が複雑になったタイミングでPatriotを導入しました。 導入した結果としては、

  1. 依存関係の制御が容易になった

  2. さまざまなプロジェクトでPatriotを利用するようになった結果、ジョブの管理や進捗がPatriotのみで把握できるようになった。

  3. ジョブの登録や管理が共通化されたので、Patriotを触れるエンジニアが増えたことで、業務コストや心理的な負担(「バッチがコケたどうしよう」「リトライどうやるっけ??」)が減った

と、Patriotの導入は成功だったと考えています。

ワークフロースケジューラーといえばAzkabanやAirflowなどが有名ですが、CyberAgentのOSSのPatriotへ興味をもってもらうきっかけになればと思ってます。

P.S. ちなみに、Twitterやグループ内のエンジニアから軽くつっこみを頂いたJenkinsでバッチのキックが残っている件ですがこちらは残念ながら移行できておりません。 実はこのJenkinsは分析用HadoopのHiveクエリのジョブです。「全然ジョブ管理が一元化されてないじゃん!!」ってツッコミあると思いますが、、、来年には移行したいなと思ってます。

それでは来年もどうぞCyberZ公式エンジニアブログをよろしくお願いいたします。