Azure Blob Storage上のCSVファイルをEmbulkでMySQLにインポート

こんにちは、サイオステクノロジーの森谷です。
今回はAzure上で並列データ転送ツールのEmbulkを動かしてみたいと思います。

Embulkとは

EmbulkはTreasure Data社がオープンソースで公開している並列データ転送ツールです。
プラグイン型のアーキテクチャを採用しており様々なファイルフォーマットやストレージに対応可能な点、処理を複数のタスクに分割し並列実行する点などが特徴です。
リアルタイムに入出力を行うFluentdとは異なり、単体の実行、あるいは日次や1時間毎といったバッチ処理に特化しています。
要するに、Fluentdのバッチ処理版になります。

今回は、このEmbulkを用い、Azureのストレージ上のCSVファイルを取得し、内容をRDBに格納します。
以下にその大まかな構成を示します。

今回の構成の概略図

この記事では、以下の項目について解説します。

  • Azure Blob Storageの作成
  • MySQL データベースの作成
  • Embulkのインストール
  • Embulkの設定・実行

GitHub: https://github.com/embulk/embulk
Document: https://www.embulk.org/docs/

Azure Blob Storageの準備

データベースに入力するファイルの配置先として、今回はAzureのストレージサービスであるAzure Storageを利用します。
Azure Storageのサービスを利用するためにはストレージ アカウントが必要です。
今回は2種類あるストレージ アカウントの内、Blob ストレージ アカウントを選択します。
これは汎用ストレージ アカウントと異なり、Azure StorageのサービスのうちBlob Storageのみに特化したアカウントで、他のサービスは利用できません。
その反面、ホット/クールの2段階のアクセス レベルや耐久性など、汎用ストレージ アカウントに比べ高機能となっています。

ストレージ アカウントを作成する場合はAzure ポータルで「新規」>「Storage」>「ストレージ アカウント」と選択します。

「新規」>「Storage」を選択した図

表示された「ストレージ アカウントの作成」画面で、必要なパラメータを入力します。
このとき「アカウントの種類」で「BLOB ストレージ」を選択することを忘れないようにしましょう。
パラメータ入力画面。「BLOB ストレージ」の選択を忘れずに

ストレージ アカウントを作成したら、次はストレージ内にコンテナーを新規作成します。
作成したストレージ アカウントを選択し、「概要」の上部に+マークと共にある「コンテナー」を選択し、必要なパラメータを入力します。
この時アクセスの種類をBLOBにすることで、外部から配置したファイルがアクセス可能になります。
ストレージ アカウントの「概要」、その上部の「コンテナー」を選択した画面

最後に作成したコンテナーにファイルを配置します。
作成したコンテナーを選択し、上にあるアップロードを選択し、ファイルを選択しましょう。
今回は入力ファイルの例として、東京都港区オープンデータより施設情報を配置しました。
コンテナーを選択した画面

これで、Azureのストレージ アカウントを作成し、Azure Blob Storage上に入力ファイルを配置することができました。

MySQLサーバーの準備

データの出力先となるデータベースに、今回はMySQLを利用します。
Azure上でMySQLを利用する手段として、デプロイしたVM上に構築することなどが挙げられます。
簡易にMySQLを利用する手段として、今回はAzure Marketplace上でClearDB社が提供するマネージドなMySQLデータベースを利用してみます。

Azure ポータルで「新規」を選択して、「MySQL Database」で検索し、公開元が「ClearDB」、名称が「MySQL データベース」のサービスを見つけます。
あとは必要なパラメータを入力して「作成」を押せばすぐにMySQLが利用できます。
Azure Marketplaceで「MySQL Database」で検索した画面

「MySQL Database」を利用する場合の注意点として、このサービスはAzure Marketplace上でサードパーティが提供するサービスです。
そのため、Azure クレジットが使用されずに代金が別途請求されます。
幸い「MySQL Database」には無料の価格レベル「水星」がありますので、Azureの無料枠で試す場合は、この設定を行うことを忘れないようにしてください。
MySQL Databaseのパラメータ設定中の画面。価格レベルを水星にするのを忘れずに

MySQLのデータベースを作成したら、作成したデータベースを選択し、「設定」>「プロパティ」を参照しましょう。
ホスト、ポート、ユーザー名、パスワードなど、接続に必要な情報が確認できます。
作成したデータベースの「設定」>「プロパティ」を選択した画面

これで、Embulkの出力先にMySQLデータベースを利用する用意ができました。

Embulkのインストール

Azure Blob StorageとMySQLをつなぐEmbulkを、Azure VM上にインストールします。
Azure VMはあらかじめ用意してある下記環境のものを利用します。

使用Azure VM環境

  • CentOS 7.2 64bit
  • Oracle JDKをインストール済(jdk-8u111-linux-x64.rpm)
  • curlインストール済

インストールコマンド

インストールは簡単なコマンドの実行のみで可能です。

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

Embulk本体をインストールした後は、入出力に必要なプラグインをEmbulkにインストールします。
今回必要なプラグインはAzure Blob Storage用の入力プラグインと、MySQL用の出力プラグインです。
プラグインをコマンド1つでインストールするだけで、様々な入力に対応できる点がEmbulkの魅力です。

$ embulk gem install embulk-input-azure_blob_storage
$ embulk gem install embulk-output-mysql

Embulkの実行

いよいよEmbulkを実行し、Blob Storage上のファイルをMySQLサーバーにインポートしてみます。

設定ファイル

まずはEmbulkの設定ファイル「load.yml」を用意します。
今回はAzure Blob Storage上のCSVファイルからMySQLのテーブルにデータを転送するため、以下のようになります。
この設定では出力先テーブルが存在しない場合、Embulkが自動的にテーブルを作成してくれるため、事前にMySQL上にテーブルを作成しておく必要はありません。

in:
  type: azure_blob_storage
  account_name: <your-storage-account>
  account_key: <your-primary-access-key>
  container: <your-container>
  path_prefix: minatokushisetsujoho_
  parser:
    type: csv
    delimiter: ','
    quote: '"'
    skip_header_lines: true
    stop_on_invalid_record: true
    default_timezone: Asia/Tokyo
    newline: CRLF
    charset: MS932
    columns:
    - {name: last_updated, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: page_title, type: string}
    - {name: class_code, type: long}
    - {name: first_class, type: string}
    - {name: second_class, type: string}
    - {name: file_path, type: string}
    - {name: area, type: string}
    - {name: introduction, type: string}
    - {name: location, type: string}
    - {name: contact, type: string}
    - {name: opening_hours, type: string}
    - {name: closing_day, type: string}
    - {name: overview, type: string}
    - {name: reception_hours, type: string}
    - {name: access_train, type: string}
    - {name: access_bus, type: string}
    - {name: parking, type: string}
    - {name: bicycle_parking, type: string}
    - {name: image1, type: string}
    - {name: caption1, type: string}
    - {name: image2, type: string}
    - {name: caption2, type: string}
    - {name: image3, type: string}
    - {name: caption3, type: string}
    - {name: image4, type: string}
    - {name: caption4, type: string}
    - {name: image5, type: string}
    - {name: caption5, type: string}
    - {name: latitude, type: string}
    - {name: longitude, type: string}
out:
  type: mysql
  host: <your-host>
  port: <your-port>
  user: <your-user-name>
  password: <your-user-password>
  table: facilities_in_minato_ward
  mode: truncate_insert
  default_timezone: Asia/Tokyo
  column_options:
    latitude: {type: 'DECIMAL(9,6)'}
    longitude: {type: 'DECIMAL(9,6)'}

Blob StorageやMySQLの接続情報、およびCSVのカラム情報は自分の環境に合わせて変更してください。

実行

カレントディレクトリに「load.yml」を配置し、以下のコマンドでEmbulkを実行します。

$ embulk run load.yml

実行結果の確認

Embulkの実行が完了したら、早速テーブルの中身を確認してみましょう!
以下はmysql-clientをインストールし、CLI上で確認した場合の例です。
MySQLデータベースにアクセスする方法はこれ以外にもあるので、自分のお好みの方法で行ってください。
MySQLクライアントでテーブル内のデータを確認している画面

テーブルが作成され、中にデータが格納されていることが確認できました!

まとめ

今回はAzure上のVMにEmbulkをインストールし、Azure Blob Storage上に配置したCSVファイルのデータをMySQLデータベースにインポートしました。
Embulkは日次や1時間毎に実行するようなバッチ処理が得意なので、ローテートした古いログをDBに保存する、日ごとに分割されたログを日次で保存するといったシチュエーションで活用するとよいと思います。

リアルタイムでログをDBに格納するようなパターンもあると思いますので、機会がありましたらFluentdを用いたMySQLへのデータインポートの方法もご紹介したいと思います。
ここまでご覧いただきありがとうございました。

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です