Apache NIFIでAzure SQL Databaseと連携する

データフローオーケストレーションツールのApache NiFiを試したいと思います。

Apache NiFi とは

Apache NiFiは、システム間のデータフローを自動化し管理するためのオープンソースのツールです。
NiFiはデータストリームをリアルタイム的に処理を行う機能を実現しています。分散処理やIoTのように膨大なデータを処理するために必要な機能を備えているツールです。

主な特徴は以下になります。

  • WebベースUI
  • 高い設定性
  • データの来歴
  • 拡張性できる設計
  • セキュア

詳細は以下をご覧頂ければと思います。
https://nifi.apache.org/

前提動作環境

本記事での動作環境は下記になります。

  • Apache NiFi 1.6.0
  • Windows Server 2012 R2   ※事前にインストールしておきます。
  • jdk-8u172-windows-x64.exe ※事前にインストールしておきます。
  • Azure SQL Database ※事前に作成しておきます。
    • DB名:pdi
    • スキーマ名:dbo
    • テーブル名:test_table
      • 項目定義:time(smalldatetime型)、name(ntext型)、age(smallint型)

Apache NiFi のインストール

Apache NiFiのダウンロードサイトから、「nifi-1.6.0-bin.zip」をダウンロードします。zipファイルの中の「nifi-1.6.0」を「C:\」配下にコピーし、フォルダ名を「nifi160」にリネーム変更します。

C:\nifi160\bin\run_nifi.bat を実行すれば起動します。
しばらくして、ブラウザでhttp://localhost:8080/nifiにアクセスし、Web UIを表示できることを確認します。

今回作成する処理フロー

今回作成する処理フローのシナリオは、「ローカルのJSONファイルの取得 → JSONデータをSQLに変更 → Azure SQL Databeseにデータ挿入」といった流れにします。

使用するProcessorは、以下の4つになります。
[GetFile]、[ConvertJSONToSQL]、[PutSQL]、[LogAttribute]

こちらの4つのProcessorを使って、以下の[全体処理フローイメージ図]のように作成します。
input

ここで、Apach NiFiの用語について、補足します。

NiFi用語 説明
フローファイル、FlowFile システム内を移動するデータ、NiFiに取り込まれたデータ。このデータは属性(Attributes)とコンテンツを保持する。
プロセッサ、Processor 外部ソースからデータを取得するツール、フローファイルの属性やコンテンツに対しアクションを実行し、外部ソースへとデータを発信する。
コネクション、Connection プロセッサ間のリンク、データの行き先を決めるキューとリレーションシップを保持する。
フローコントローラ、Flow Controller プロセッサ間のフローファイル転送を実現するブローカとして振る舞う。
プロセスグループ、Process Group プロセッサ、ファンネルなどを組み合わせ、新規コンポーネントの作成を可能とする

以降で、各設定方法を説明します。

各プロセッサの設定

[GetFile]

[GetFile]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

InputDirectory: ${jsonファイルを配置したフォルダパス}

インポート対象のjsonファイルは、以下のデータをUTF8で用意します。

[{"time":"2018-05-21 15:22:22.147","name":"サイオス太郎","age":"20"},{"time":"2018-05-21 17:22:22.147","name":"ビッグデータ太郎","age":"22"}]

[ConvertJSONToSQL]

[ConvertJSONToSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

JDBC Connection Pool: DBCPConnectionPool
Statement Type: INSERT
Table Name: test_table
Schema Name: dbo
Translate Field Names: false

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: originalにチェック

「JDBC Connection Pool」の「Value」をセットした際に右隣の列にあらわれた「→」をクリックし、DB接続の「DBCPConnectionPool」を設定できる一覧画面に遷移します。
一覧画面から、「DBCPConnectionPool」の右端の方にある設定アイコンをクリックします。

以下のPROPERTIESタブを開きます。
input

こちらからダウンロードしたAzure SQL DatabaseのJDBCドライバを、適宜フォルダに配置しておきます。

以下を設定します。

Database Connection URL: jdbc:sqlserver://${Azure SQL DatabaseのDB名}.database.windows.net:1433;database=pdi;user=${your_username_here}@etl-db;password=${your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
Database Driver Class Name: com.microsoft.sqlserver.jdbc.SQLServerDriver
Database Driver Location(s): ${JDBCドライバを配置したフォルダパス}

[PutSQL]

[PutSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

JDBC Connection Pool: DBCPConnectionPool

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: retryとsuccessにチェック

[LogAttribute]

[LogAttribute]をダブルクリックします。

PROPERTIESタブを開いて、以下のみ設定します。

Log Level:debug

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: successにチェック

プロセッサをつなぐ

NiFiのデータフローは、プロセッサをRelationshipでつなぐことで流れをつくっていきます。 プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。

先述の[全体処理フローイメージ図]のとおりに、4つのプロセッサをRelationshipでつなぎます。 接続元のプロセッサが複数の出力Relationshipを持っている場合(success、failure、original、retryなど)、確認ダイアログが表示されます。
ここでは、正常ルートを定義するので、[全体処理フローイメージ図]にしたがって適宜選択します。

LogAttributeにFailureを流す

プロセッサ設定の不備などで、処理が失敗した場合、渡ってきたFlowFileはfailureに流されるため、ログで確認できるようfailureを[LogAttribute]につなぎます。

不要なRelationshipをAuto-Terminateする

NiFiの内部を流れるFlowFileは、行き先がなくなる(終点までたどり着く)と、削除されます。終端のプロセッサでは、それ以上FlowFileを扱う必要がないので、Relationshipの先がありません。
不要なRelationshipは、プロセッサのSETTINGSタブから、上記「プロセッサの設定」にて、設定しているように、Automatically Terminate Relationshipsでチェックします。

フローの実行

全ての設定が完了した後、フローを実行します。
フローの実行は、キャンバスの空白部分をクリックし、選択を解除してから、以下の操作パレットのスタートアイコンをクリックすると、すべてのプロセッサが起動します。
input

しばらくすると、NiFiのWeb UIに、[GetFile]から[PutSQL]までの正常ルートをデータが通ったことが確認できます。
input

Azure SQL Databaseにログインし、以下のとおり、SQLのSELECT文を発行し、データが登録されていることを確認しましょう。
input

まとめ

Web UI でのフロー作成だけで、ファイルからAzure SQL Databaseへとデータ連携できることを紹介しました。

斎藤@SSTD

Be the first to comment

コメント投稿

Your email address will not be published.


*