Apache NIFIでAzure SQL Databaseと連携する

データフローオーケストレーションツールのApache NiFiを試したいと思います。

Apache NiFi とは

Apache NiFiは、システム間のデータフローを自動化し管理するためのオープンソースのツールです。
NiFiはデータストリームをリアルタイム的に処理を行う機能を実現しています。分散処理やIoTのように膨大なデータを処理するために必要な機能を備えているツールです。

主な特徴は以下になります。

  • WebベースUI
  • 高い設定性
  • データの来歴
  • 拡張性できる設計
  • セキュア

詳細は以下をご覧頂ければと思います。
https://nifi.apache.org/

前提動作環境

本記事での動作環境は下記になります。

  • Apache NiFi 1.6.0
  • Windows Server 2012 R2   ※事前にインストールしておきます。
  • jdk-8u172-windows-x64.exe ※事前にインストールしておきます。
  • Azure SQL Database ※事前に作成しておきます。
    • DB名:pdi
    • スキーマ名:dbo
    • テーブル名:test_table
      • 項目定義:time(smalldatetime型)、name(ntext型)、age(smallint型)

Apache NiFi のインストール

Apache NiFiのダウンロードサイトから、「nifi-1.6.0-bin.zip」をダウンロードします。zipファイルの中の「nifi-1.6.0」を「C:\」配下にコピーし、フォルダ名を「nifi160」にリネーム変更します。

C:\nifi160\bin\run_nifi.bat を実行すれば起動します。
しばらくして、ブラウザでhttp://localhost:8080/nifiにアクセスし、Web UIを表示できることを確認します。

今回作成する処理フロー

今回作成する処理フローのシナリオは、「ローカルのJSONファイルの取得 → JSONデータをSQLに変更 → Azure SQL Databeseにデータ挿入」といった流れにします。

使用するProcessorは、以下の4つになります。
[GetFile]、[ConvertJSONToSQL]、[PutSQL]、[LogAttribute]

こちらの4つのProcessorを使って、以下の[全体処理フローイメージ図]のように作成します。
input

ここで、Apach NiFiの用語について、補足します。

NiFi用語 説明
フローファイル、FlowFile システム内を移動するデータ、NiFiに取り込まれたデータ。このデータは属性(Attributes)とコンテンツを保持する。
プロセッサ、Processor 外部ソースからデータを取得するツール、フローファイルの属性やコンテンツに対しアクションを実行し、外部ソースへとデータを発信する。
コネクション、Connection プロセッサ間のリンク、データの行き先を決めるキューとリレーションシップを保持する。
フローコントローラ、Flow Controller プロセッサ間のフローファイル転送を実現するブローカとして振る舞う。
プロセスグループ、Process Group プロセッサ、ファンネルなどを組み合わせ、新規コンポーネントの作成を可能とする

以降で、各設定方法を説明します。

各プロセッサの設定

[GetFile]

[GetFile]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

インポート対象のjsonファイルは、以下のデータをUTF8で用意します。

[ConvertJSONToSQL]

[ConvertJSONToSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

SETTINGSタブを開いて以下を設定します。

「JDBC Connection Pool」の「Value」をセットした際に右隣の列にあらわれた「→」をクリックし、DB接続の「DBCPConnectionPool」を設定できる一覧画面に遷移します。
一覧画面から、「DBCPConnectionPool」の右端の方にある設定アイコンをクリックします。

以下のPROPERTIESタブを開きます。
input

こちらからダウンロードしたAzure SQL DatabaseのJDBCドライバを、適宜フォルダに配置しておきます。

以下を設定します。

[PutSQL]

[PutSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

SETTINGSタブを開いて以下を設定します。

[LogAttribute]

[LogAttribute]をダブルクリックします。

PROPERTIESタブを開いて、以下のみ設定します。

SETTINGSタブを開いて以下を設定します。

プロセッサをつなぐ

NiFiのデータフローは、プロセッサをRelationshipでつなぐことで流れをつくっていきます。 プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。

先述の[全体処理フローイメージ図]のとおりに、4つのプロセッサをRelationshipでつなぎます。 接続元のプロセッサが複数の出力Relationshipを持っている場合(success、failure、original、retryなど)、確認ダイアログが表示されます。
ここでは、正常ルートを定義するので、[全体処理フローイメージ図]にしたがって適宜選択します。

LogAttributeにFailureを流す

プロセッサ設定の不備などで、処理が失敗した場合、渡ってきたFlowFileはfailureに流されるため、ログで確認できるようfailureを[LogAttribute]につなぎます。

不要なRelationshipをAuto-Terminateする

NiFiの内部を流れるFlowFileは、行き先がなくなる(終点までたどり着く)と、削除されます。終端のプロセッサでは、それ以上FlowFileを扱う必要がないので、Relationshipの先がありません。
不要なRelationshipは、プロセッサのSETTINGSタブから、上記「プロセッサの設定」にて、設定しているように、Automatically Terminate Relationshipsでチェックします。

フローの実行

全ての設定が完了した後、フローを実行します。
フローの実行は、キャンバスの空白部分をクリックし、選択を解除してから、以下の操作パレットのスタートアイコンをクリックすると、すべてのプロセッサが起動します。
input

しばらくすると、NiFiのWeb UIに、[GetFile]から[PutSQL]までの正常ルートをデータが通ったことが確認できます。
input

Azure SQL Databaseにログインし、以下のとおり、SQLのSELECT文を発行し、データが登録されていることを確認しましょう。
input

まとめ

Web UI でのフロー作成だけで、ファイルからAzure SQL Databaseへとデータ連携できることを紹介しました。

斎藤@SSTD

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメント投稿

メールアドレスは表示されません。


*