Azure VM上のDigdagでワークフロー管理を行う

Digdagとは

TreasureData社がオープンソースで公開しているツールで、依存関係のある複数のタスクを実行するワークフローエンジンです。
プログラマではなくてもわかりやすいよう、YAMLに対し、DSLにて、DAGの構造で、ワークフローを定義することができます。

ワークフローエンジンに必要な、以下の機能が備わっています。

  • タスクの依存関係
  • スケジューリング
  • エラー処理
  • リトライ
  • メール通知
  • タスクの並列実行
  • タスク実行ログの収集

Github: https://github.com/treasure-data/digdag/
Document: https://docs.digdag.io/

導入方法

インストールからジョブを設定するところまでをみていきます。

1.動作環境

動作環境は下記になります。

  • Azureu上仮想マシンでCentOS 6.8
  • jdk-8u112-linux-x64.rpm
  • Digdag 0.8.22

2.インストールと起動

Digdagをインストールします。

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

次に、Digdagをサーバーとして起動できるよう、「~/.config/digdag/」直下に、以下の設定ファイルを2つ作成します。

「digdag.properties」の作成例

database.type=h2
database.path=/home/cent/bin/test.db
digdag.secret-access-policy-file=/home/cent/.config/digdag/secret-access-policy.yaml
#「https://docs.digdag.io/command_reference.html#secret-encryption-key」と同じ設定を指定
digdag.secret-encryption-key=MDEyMzQ1Njc4OTAxMjM0NQ==

「secret-access-policy.yaml」の作成
以下のリンク先と同じ設定を指定する

secret-access-policy.yaml

それから、作成後に以下のコマンドでDigdagのサーバを起動します。

cd ~/bin/
digdag server -c ~/.config/digdag/digdag.properties -L ~/bin/logs/server.log -l debug -O ~/bin/logs -A ~/bin/logs &

3.ワークフローの作成

TreasureDataにある2つのテーブルにて、一方のテーブルに、他方のテーブルからクエリ実行で取得したレコードを挿入するワークフローを作成します。
ワークフローには、以下のタスクを定義します。

  1. レコード挿入前に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
  2. 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
  3. TreasureDataに保存してあるジョブクエリを実行する(ジョブクエリは、一方のテーブルに、他方のテーブルにクエリを実行して取得したレコードを挿入するクエリ)
  4. レコード挿入後に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
  5. 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する

ワークフロー実装例「spike.dig」

timezone: Asia/Tokyo

schedule:
cron>: 30 14 7 12 *

_export:
td:
database: spikedb
mail:
port: 465
from: ""
to: [""]
debug: true

_error:
mail>: ./tmpl/err.txt
subject: 【digdag mail test】err mail

+step1:
td>: ./sql/step1.sql
store_last_results: true
engine: hive

+step1-1:
mail>: ./tmpl/info_step1.txt
subject: 【digdag mail test】step1 info mail

+step2:
td_run>: spikedb_spiketbl002_select _all

+step3:
td>: ./sql/step3.sql
store_last_results: true
engine: hive

+step3-1:
mail>: ./tmpl/info_step3.txt
subject: 【digdag mail test】step3 info mail

クエリ作成例「step1.sql」

select count(*) AS cnt1 from spiketbl001

メール本文テンプレート作成例「info_step1.txt」

${moment().format("YYYYMMDD HH:mm:ss")}
-----------
information
-----------
step1 sql count=${td.last_results.cnt1}

なお、今回サンプルとして作成するワークフローのDigdagプロジェクト「spike」は、「~/bin/」直下に作成し、フォルダツリーは以下のようにします。

$ tree spike
spike/
├─spike.dig
│
├─sql
│ ├── step1.sql
│ └── step3.sql
│
└─tmpl
├── err.txt
├── info_step1.txt
└── info_step3.txt

次に、以下のコマンドにて、サーバに今回作成したワークフローを登録します。コマンドは、カレントディレクトリをプロジェクトフォルダに移動してから実行します。

cd ~/bin/spike
digdag push spike

さらに、以下のコマンドにて、サーバに今回作成したワークフロー中で使用される「secrets」パラメータの設定を登録します。パラメータの設定は、json形式でファイルに定義できるため、作成した定義ファイル「secrets.json」をコマンド引数に指定しています。

cd ~/bin/
digdag secrets --project spike --set @secrets.json

スケジュールの登録状況は、以下のコマンドで確認できます。

cd ~/bin/spike
digdag schedules

当該ワークフローの実行状況は、以下のコマンドで確認できます。

digdag sessions spike

まとめ

Digdagは、以下の特徴があります。

  • DAG形式のワークフローが作成できる
  • フロー制御や外部のスクリプト呼出しができプログラマブルなワークフローにできる
  • Treasure Dataの機能が使える
  • ワークフローで使用する環境変数などのパラメータを埋込みではなく外出しにできる
  • ワークフローの実行時間超過・失敗時のエラー通知ができる
  • タスクの並列実行機能がある

データ分析基盤の新規構築や見直しの際に、本記事のような、OSSのワークフロー管理ツールの導入し、
ETL処理、DBの定期更新や定型レポートの作成など、一連の定型処理を自動化すれば、
リードタイム・コストダウンの削減につながります。

斎藤@SSTD

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です