Apache NIFIでAzure SQL Databaseと連携する

◆ Live配信スケジュール ◆
サイオステクノロジーでは、Microsoft MVPの武井による「わかりみの深いシリーズ」など、定期的なLive配信を行っています。
⇒ 詳細スケジュールはこちらから
⇒ 見逃してしまった方はYoutubeチャンネルをご覧ください
【4/18開催】VSCode Dev Containersで楽々開発環境構築祭り〜Python/Reactなどなど〜
Visual Studio Codeの拡張機能であるDev Containersを使ってReactとかPythonとかSpring Bootとかの開発環境をラクチンで構築する方法を紹介するイベントです。
https://tech-lab.connpass.com/event/311864/

データフローオーケストレーションツールのApache NiFiを試したいと思います。

Apache NiFi とは

Apache NiFiは、システム間のデータフローを自動化し管理するためのオープンソースのツールです。
NiFiはデータストリームをリアルタイム的に処理を行う機能を実現しています。分散処理やIoTのように膨大なデータを処理するために必要な機能を備えているツールです。

主な特徴は以下になります。

  • WebベースUI
  • 高い設定性
  • データの来歴
  • 拡張性できる設計
  • セキュア

詳細は以下をご覧頂ければと思います。
https://nifi.apache.org/

前提動作環境

本記事での動作環境は下記になります。

  • Apache NiFi 1.6.0
  • Windows Server 2012 R2   ※事前にインストールしておきます。
  • jdk-8u172-windows-x64.exe ※事前にインストールしておきます。
  • Azure SQL Database ※事前に作成しておきます。
    • DB名:pdi
    • スキーマ名:dbo
    • テーブル名:test_table
      • 項目定義:time(smalldatetime型)、name(ntext型)、age(smallint型)

Apache NiFi のインストール

Apache NiFiのダウンロードサイトから、「nifi-1.6.0-bin.zip」をダウンロードします。zipファイルの中の「nifi-1.6.0」を「C:\」配下にコピーし、フォルダ名を「nifi160」にリネーム変更します。

C:\nifi160\bin\run_nifi.bat を実行すれば起動します。
しばらくして、ブラウザでhttps://localhost:8080/nifiにアクセスし、Web UIを表示できることを確認します。

今回作成する処理フロー

今回作成する処理フローのシナリオは、「ローカルのJSONファイルの取得 → JSONデータをSQLに変更 → Azure SQL Databeseにデータ挿入」といった流れにします。

使用するProcessorは、以下の4つになります。
[GetFile]、[ConvertJSONToSQL]、[PutSQL]、[LogAttribute]

こちらの4つのProcessorを使って、以下の[全体処理フローイメージ図]のように作成します。
input

ここで、Apach NiFiの用語について、補足します。

NiFi用語説明
フローファイル、FlowFileシステム内を移動するデータ、NiFiに取り込まれたデータ。このデータは属性(Attributes)とコンテンツを保持する。
プロセッサ、Processor外部ソースからデータを取得するツール、フローファイルの属性やコンテンツに対しアクションを実行し、外部ソースへとデータを発信する。
コネクション、Connectionプロセッサ間のリンク、データの行き先を決めるキューとリレーションシップを保持する。
フローコントローラ、Flow Controllerプロセッサ間のフローファイル転送を実現するブローカとして振る舞う。
プロセスグループ、Process Groupプロセッサ、ファンネルなどを組み合わせ、新規コンポーネントの作成を可能とする

以降で、各設定方法を説明します。

各プロセッサの設定

[GetFile]

[GetFile]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

InputDirectory: ${jsonファイルを配置したフォルダパス}

インポート対象のjsonファイルは、以下のデータをUTF8で用意します。

[{"time":"2018-05-21 15:22:22.147","name":"サイオス太郎","age":"20"},{"time":"2018-05-21 17:22:22.147","name":"ビッグデータ太郎","age":"22"}]

[ConvertJSONToSQL]

[ConvertJSONToSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

JDBC Connection Pool: DBCPConnectionPool
Statement Type: INSERT
Table Name: test_table
Schema Name: dbo
Translate Field Names: false

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: originalにチェック

「JDBC Connection Pool」の「Value」をセットした際に右隣の列にあらわれた「→」をクリックし、DB接続の「DBCPConnectionPool」を設定できる一覧画面に遷移します。
一覧画面から、「DBCPConnectionPool」の右端の方にある設定アイコンをクリックします。

以下のPROPERTIESタブを開きます。
input

こちらからダウンロードしたAzure SQL DatabaseのJDBCドライバを、適宜フォルダに配置しておきます。

以下を設定します。

Database Connection URL: jdbc:sqlserver://${Azure SQL DatabaseのDB名}.database.windows.net:1433;database=pdi;user=${your_username_here}@etl-db;password=${your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
Database Driver Class Name: com.microsoft.sqlserver.jdbc.SQLServerDriver
Database Driver Location(s): ${JDBCドライバを配置したフォルダパス}

[PutSQL]

[PutSQL]をダブルクリックします。

以下のPROPERTIESタブを開きます。
input
以下を設定します。

JDBC Connection Pool: DBCPConnectionPool

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: retryとsuccessにチェック

[LogAttribute]

[LogAttribute]をダブルクリックします。

PROPERTIESタブを開いて、以下のみ設定します。

Log Level:debug

SETTINGSタブを開いて以下を設定します。

Automatically Terminate Relationships: successにチェック

プロセッサをつなぐ

NiFiのデータフローは、プロセッサをRelationshipでつなぐことで流れをつくっていきます。 プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。

先述の[全体処理フローイメージ図]のとおりに、4つのプロセッサをRelationshipでつなぎます。 接続元のプロセッサが複数の出力Relationshipを持っている場合(success、failure、original、retryなど)、確認ダイアログが表示されます。
ここでは、正常ルートを定義するので、[全体処理フローイメージ図]にしたがって適宜選択します。

LogAttributeにFailureを流す

プロセッサ設定の不備などで、処理が失敗した場合、渡ってきたFlowFileはfailureに流されるため、ログで確認できるようfailureを[LogAttribute]につなぎます。

不要なRelationshipをAuto-Terminateする

NiFiの内部を流れるFlowFileは、行き先がなくなる(終点までたどり着く)と、削除されます。終端のプロセッサでは、それ以上FlowFileを扱う必要がないので、Relationshipの先がありません。
不要なRelationshipは、プロセッサのSETTINGSタブから、上記「プロセッサの設定」にて、設定しているように、Automatically Terminate Relationshipsでチェックします。

フローの実行

全ての設定が完了した後、フローを実行します。
フローの実行は、キャンバスの空白部分をクリックし、選択を解除してから、以下の操作パレットのスタートアイコンをクリックすると、すべてのプロセッサが起動します。
input

しばらくすると、NiFiのWeb UIに、[GetFile]から[PutSQL]までの正常ルートをデータが通ったことが確認できます。
input

Azure SQL Databaseにログインし、以下のとおり、SQLのSELECT文を発行し、データが登録されていることを確認しましょう。
input

まとめ

Web UI でのフロー作成だけで、ファイルからAzure SQL Databaseへとデータ連携できることを紹介しました。

斎藤@SSTD

アバター画像
About サイオステクノロジーの中の人 10 Articles
サイオステクノロジーで働く中の人です。
ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。


ご覧いただきありがとうございます。
ブログの最新情報はSNSでも発信しております。
ぜひTwitterのフォロー&Facebookページにいいねをお願い致します!



>> 雑誌等の執筆依頼を受付しております。
   ご希望の方はお気軽にお問い合わせください!

Be the first to comment

Leave a Reply

Your email address will not be published.


*


質問はこちら 閉じる