データフローオーケストレーションツールのApache NiFiを試したいと思います。
Apache NiFi とは
Apache NiFiは、システム間のデータフローを自動化し管理するためのオープンソースのツールです。
NiFiはデータストリームをリアルタイム的に処理を行う機能を実現しています。分散処理やIoTのように膨大なデータを処理するために必要な機能を備えているツールです。
主な特徴は以下になります。
- WebベースUI
- 高い設定性
- データの来歴
- 拡張性できる設計
- セキュア
詳細は以下をご覧頂ければと思います。
https://nifi.apache.org/
前提動作環境
本記事での動作環境は下記になります。
- Apache NiFi 1.6.0
- Windows Server 2012 R2 ※事前にインストールしておきます。
- jdk-8u172-windows-x64.exe ※事前にインストールしておきます。
- Azure SQL Database ※事前に作成しておきます。
- DB名:pdi
- スキーマ名:dbo
- テーブル名:test_table
- 項目定義:time(smalldatetime型)、name(ntext型)、age(smallint型)
Apache NiFi のインストール
Apache NiFiのダウンロードサイトから、「nifi-1.6.0-bin.zip」をダウンロードします。zipファイルの中の「nifi-1.6.0」を「C:\」配下にコピーし、フォルダ名を「nifi160」にリネーム変更します。
C:\nifi160\bin\run_nifi.bat を実行すれば起動します。
しばらくして、ブラウザでhttps://localhost:8080/nifiにアクセスし、Web UIを表示できることを確認します。
今回作成する処理フロー
今回作成する処理フローのシナリオは、「ローカルのJSONファイルの取得 → JSONデータをSQLに変更 → Azure SQL Databeseにデータ挿入」といった流れにします。
使用するProcessorは、以下の4つになります。
[GetFile]、[ConvertJSONToSQL]、[PutSQL]、[LogAttribute]
こちらの4つのProcessorを使って、以下の[全体処理フローイメージ図]のように作成します。
ここで、Apach NiFiの用語について、補足します。
NiFi用語 | 説明 |
---|---|
フローファイル、FlowFile | システム内を移動するデータ、NiFiに取り込まれたデータ。このデータは属性(Attributes)とコンテンツを保持する。 |
プロセッサ、Processor | 外部ソースからデータを取得するツール、フローファイルの属性やコンテンツに対しアクションを実行し、外部ソースへとデータを発信する。 |
コネクション、Connection | プロセッサ間のリンク、データの行き先を決めるキューとリレーションシップを保持する。 |
フローコントローラ、Flow Controller | プロセッサ間のフローファイル転送を実現するブローカとして振る舞う。 |
プロセスグループ、Process Group | プロセッサ、ファンネルなどを組み合わせ、新規コンポーネントの作成を可能とする |
以降で、各設定方法を説明します。
各プロセッサの設定
[GetFile]
[GetFile]をダブルクリックします。以下のPROPERTIESタブを開きます。
以下を設定します。
InputDirectory: ${jsonファイルを配置したフォルダパス}
インポート対象のjsonファイルは、以下のデータをUTF8で用意します。
[{"time":"2018-05-21 15:22:22.147","name":"サイオス太郎","age":"20"},{"time":"2018-05-21 17:22:22.147","name":"ビッグデータ太郎","age":"22"}]
[ConvertJSONToSQL]
[ConvertJSONToSQL]をダブルクリックします。以下のPROPERTIESタブを開きます。
以下を設定します。
JDBC Connection Pool: DBCPConnectionPool Statement Type: INSERT Table Name: test_table Schema Name: dbo Translate Field Names: false
SETTINGSタブを開いて以下を設定します。
Automatically Terminate Relationships: originalにチェック
「JDBC Connection Pool」の「Value」をセットした際に右隣の列にあらわれた「→」をクリックし、DB接続の「DBCPConnectionPool」を設定できる一覧画面に遷移します。
一覧画面から、「DBCPConnectionPool」の右端の方にある設定アイコンをクリックします。
以下のPROPERTIESタブを開きます。
こちらからダウンロードしたAzure SQL DatabaseのJDBCドライバを、適宜フォルダに配置しておきます。
以下を設定します。
Database Connection URL: jdbc:sqlserver://${Azure SQL DatabaseのDB名}.database.windows.net:1433;database=pdi;user=${your_username_here}@etl-db;password=${your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30; Database Driver Class Name: com.microsoft.sqlserver.jdbc.SQLServerDriver Database Driver Location(s): ${JDBCドライバを配置したフォルダパス}
[PutSQL]
[PutSQL]をダブルクリックします。以下のPROPERTIESタブを開きます。
以下を設定します。
JDBC Connection Pool: DBCPConnectionPool
SETTINGSタブを開いて以下を設定します。
Automatically Terminate Relationships: retryとsuccessにチェック
[LogAttribute]
[LogAttribute]をダブルクリックします。PROPERTIESタブを開いて、以下のみ設定します。
Log Level:debug
SETTINGSタブを開いて以下を設定します。
Automatically Terminate Relationships: successにチェック
プロセッサをつなぐ
NiFiのデータフローは、プロセッサをRelationshipでつなぐことで流れをつくっていきます。 プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。
先述の[全体処理フローイメージ図]のとおりに、4つのプロセッサをRelationshipでつなぎます。 接続元のプロセッサが複数の出力Relationshipを持っている場合(success、failure、original、retryなど)、確認ダイアログが表示されます。
ここでは、正常ルートを定義するので、[全体処理フローイメージ図]にしたがって適宜選択します。
LogAttributeにFailureを流す
プロセッサ設定の不備などで、処理が失敗した場合、渡ってきたFlowFileはfailureに流されるため、ログで確認できるようfailureを[LogAttribute]につなぎます。
不要なRelationshipをAuto-Terminateする
NiFiの内部を流れるFlowFileは、行き先がなくなる(終点までたどり着く)と、削除されます。終端のプロセッサでは、それ以上FlowFileを扱う必要がないので、Relationshipの先がありません。
不要なRelationshipは、プロセッサのSETTINGSタブから、上記「プロセッサの設定」にて、設定しているように、Automatically Terminate Relationshipsでチェックします。
フローの実行
全ての設定が完了した後、フローを実行します。
フローの実行は、キャンバスの空白部分をクリックし、選択を解除してから、以下の操作パレットのスタートアイコンをクリックすると、すべてのプロセッサが起動します。
しばらくすると、NiFiのWeb UIに、[GetFile]から[PutSQL]までの正常ルートをデータが通ったことが確認できます。
Azure SQL Databaseにログインし、以下のとおり、SQLのSELECT文を発行し、データが登録されていることを確認しましょう。
まとめ
Web UI でのフロー作成だけで、ファイルからAzure SQL Databaseへとデータ連携できることを紹介しました。
斎藤@SSTD