Digdagとは
TreasureData社がオープンソースで公開しているツールで、依存関係のある複数のタスクを実行するワークフローエンジンです。
プログラマではなくてもわかりやすいよう、YAMLに対し、DSLにて、DAGの構造で、ワークフローを定義することができます。
ワークフローエンジンに必要な、以下の機能が備わっています。
- タスクの依存関係
- スケジューリング
- エラー処理
- リトライ
- メール通知
- タスクの並列実行
- タスク実行ログの収集
Github: https://github.com/treasure-data/digdag/
Document: https://docs.digdag.io/
導入方法
インストールからジョブを設定するところまでをみていきます。
1.動作環境
動作環境は下記になります。
- Azureu上仮想マシンでCentOS 6.8
- jdk-8u112-linux-x64.rpm
- Digdag 0.8.22
2.インストールと起動
Digdagをインストールします。
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" $ chmod +x ~/bin/digdag $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
次に、Digdagをサーバーとして起動できるよう、「~/.config/digdag/」直下に、以下の設定ファイルを2つ作成します。
「digdag.properties」の作成例
database.type=h2 database.path=/home/cent/bin/test.db digdag.secret-access-policy-file=/home/cent/.config/digdag/secret-access-policy.yaml #「https://docs.digdag.io/command_reference.html#secret-encryption-key」と同じ設定を指定 digdag.secret-encryption-key=MDEyMzQ1Njc4OTAxMjM0NQ==
「secret-access-policy.yaml」の作成
以下のリンク先と同じ設定を指定する
それから、作成後に以下のコマンドでDigdagのサーバを起動します。
cd ~/bin/ digdag server -c ~/.config/digdag/digdag.properties -L ~/bin/logs/server.log -l debug -O ~/bin/logs -A ~/bin/logs &
3.ワークフローの作成
TreasureDataにある2つのテーブルにて、一方のテーブルに、他方のテーブルからクエリ実行で取得したレコードを挿入するワークフローを作成します。
ワークフローには、以下のタスクを定義します。
- レコード挿入前に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
- 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
- TreasureDataに保存してあるジョブクエリを実行する(ジョブクエリは、一方のテーブルに、他方のテーブルにクエリを実行して取得したレコードを挿入するクエリ)
- レコード挿入後に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
- 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
ワークフロー実装例「spike.dig」
timezone: Asia/Tokyo schedule: cron>: 30 14 7 12 * _export: td: database: spikedb mail: port: 465 from: "" to: [""] debug: true _error: mail>: ./tmpl/err.txt subject: 【digdag mail test】err mail +step1: td>: ./sql/step1.sql store_last_results: true engine: hive +step1-1: mail>: ./tmpl/info_step1.txt subject: 【digdag mail test】step1 info mail +step2: td_run>: spikedb_spiketbl002_select _all +step3: td>: ./sql/step3.sql store_last_results: true engine: hive +step3-1: mail>: ./tmpl/info_step3.txt subject: 【digdag mail test】step3 info mail
クエリ作成例「step1.sql」
select count(*) AS cnt1 from spiketbl001
メール本文テンプレート作成例「info_step1.txt」
${moment().format("YYYYMMDD HH:mm:ss")} ----------- information ----------- step1 sql count=${td.last_results.cnt1}
なお、今回サンプルとして作成するワークフローのDigdagプロジェクト「spike」は、「~/bin/」直下に作成し、フォルダツリーは以下のようにします。
$ tree spike spike/ ├─spike.dig │ ├─sql │ ├── step1.sql │ └── step3.sql │ └─tmpl ├── err.txt ├── info_step1.txt └── info_step3.txt
次に、以下のコマンドにて、サーバに今回作成したワークフローを登録します。コマンドは、カレントディレクトリをプロジェクトフォルダに移動してから実行します。
cd ~/bin/spike digdag push spike
さらに、以下のコマンドにて、サーバに今回作成したワークフロー中で使用される「secrets」パラメータの設定を登録します。パラメータの設定は、json形式でファイルに定義できるため、作成した定義ファイル「secrets.json」をコマンド引数に指定しています。
cd ~/bin/ digdag secrets --project spike --set @secrets.json
スケジュールの登録状況は、以下のコマンドで確認できます。
cd ~/bin/spike digdag schedules
当該ワークフローの実行状況は、以下のコマンドで確認できます。
digdag sessions spike
まとめ
Digdagは、以下の特徴があります。
- DAG形式のワークフローが作成できる
- フロー制御や外部のスクリプト呼出しができプログラマブルなワークフローにできる
- Treasure Dataの機能が使える
- ワークフローで使用する環境変数などのパラメータを埋込みではなく外出しにできる
- ワークフローの実行時間超過・失敗時のエラー通知ができる
- タスクの並列実行機能がある
データ分析基盤の新規構築や見直しの際に、本記事のような、OSSのワークフロー管理ツールの導入し、
ETL処理、DBの定期更新や定型レポートの作成など、一連の定型処理を自動化すれば、
リードタイム・コストダウンの削減につながります。