Apache Zeppelinと Sparkでデータの集計と可視化をお手軽に実現する

◆ Live配信スケジュール ◆
サイオステクノロジーでは、Microsoft MVPの武井による「わかりみの深いシリーズ」など、定期的なLive配信を行っています。
⇒ 詳細スケジュールはこちらから
⇒ 見逃してしまった方はYoutubeチャンネルをご覧ください
【4/18開催】VSCode Dev Containersで楽々開発環境構築祭り〜Python/Reactなどなど〜
Visual Studio Codeの拡張機能であるDev Containersを使ってReactとかPythonとかSpring Bootとかの開発環境をラクチンで構築する方法を紹介するイベントです。
https://tech-lab.connpass.com/event/311864/

はじめに

サイオステクノロジー技術部の森田です。
Webベースの分析向けノートブックとして, Apache Zeppelin を紹介します。
お手元に仮想マシンのひとつでもあれば,Zeppelinと組み込みSparkを使って簡単にデータ集計・可視化できる内容となっています。

Apache Zeppelin

Spark/Hadoop といった分散処理システムに対してコードを実行し,実行結果をグラフとして描画することができます。インタプリタが豊富で,シェル/SQL/scala/python … と分析で必要とされるインタフェースが揃っており,自分でインタプリタを作成することも可能です。
記録されたコードと実行結果を共有できるため,複数人による共同作業に活用しやすいと言えるでしょう。
本記事では,Spark環境なしの前提ですすめます。

動作環境

Microsoft Azure の仮想マシンを用います。
以下のスペックで作成し,ネットワークセキュリティグループに受信セキュリティ規則(ssh/https許可)を追加しておきます。

  • ディストリビューションはCentOS 7.2
  • インスタンスタイプはStandard / 4コア / 28GB RAM

ちなみにAzureは 料金計算ツール を使えば1時間単位のコスト算出も朝飯前。 Azure ポータル ではダッシュボードでリソースモニタリングも楽々であります。

閑話休題。作成した仮想マシンにログインして,以下を適用してゆきましょう。
ポートフォワードはお好みで行います。

パッケージ更新

sudo yum update
sudo yum upgrade

タイムゾーンを Asia/Tokyo に変更

sudo ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime

SE Linux を無効に

sudo setenforce 0
sudo vi /etc/selinux/config
--
SELINUX=disabled

Oracle JDKをインストール

curl -OL --header "Cookie: gpw_e24=https%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm
sudo rpm -ivh jdk-8u112-linux-x64.rpm
sudo touch /etc/profile.d/jdk.sh
vi /etc/profile.d/jdk.sh
--
export JAVA_HOME=/usr/java/default

source /etc/profile.d/jdk.sh

(Optional) firewalldを有効にしてポート80を8080へフォワード

sudo firewall-cmd --state
sudo systemctl enable firewalld
sudo systemctl start firewalld
sudo systemctl status firewalld

sudo firewall-cmd --add-service=https --zone=public --permanent
sudo firewall-cmd --zone="public" --add-forward-port=port=80:proto=tcp:toport=8080 --permanent
sudo firewall-cmd --zone="public" --add-interface=eth0 --permanent
sudo firewall-cmd --reload

sudo firewall-cmd --get-active-zones
sudo firewall-cmd --zone=public --list-all

インストール

今回は Z-Manager という簡易インストーラを用いて Zeppelin をインストールしてみましょう。
Zeppelinをインストールしたいディレクトリで Z-Manager のサイトに記載のあるワンライナーを実行します。

curl -fsSL https://raw.githubusercontent.com/NFLabs/z-manager/master/zeppelin-installer.sh | bash

インタラクティブUI(シェル)で Sparkバージョンの選択や構築済み環境を指定します。Sparkスタンドアロンにより,Zeppelinを試すハードルを下げてくれます。
以下の例ではアドバンスドモードで任意バージョンのSparkをインストールしました (Z-Managerで用意されている最新)。

Welcome to Z-Manager!

Select the type of Zeppelin installation:
1. Default (Hadoop 2.4.0; Spark 1.3.1; local mode)
   Good for quickstart:
       does not require local Spark installation
       does not require external Spark cluster
2. Advanced (pick versions and adjust configuration to your cluster)
> 2

Please select one of the following Spark releases
1. Spark 1.3.1
2. Spark 1.3.0
3. Spark 1.4.0
4. Spark 1.4.1 (latest)
> 4

Please select one of the supported Hadoop releases
1. Hadoop 1.0.4
2. Hadoop 2.7.1
> 2

既存Sparkクラスタを使いたい場合はYesを選択後,次の質問で参照先のパスを指定します。

Do you want to configure external Spark cluster? y(es)/n(o)
> n

Zeppelin起動ポートのデフォルトは8080です。
尚,仮想マシン作成時にfirewalldを有効にしてポート80での接続を可能にしています。

Please enter Zeppelin port number: 8080

インストール前に設定項目の確認を求められます。

Configuration is finished now,
please review the options you have chosen before installation begins:

Spark version: 1.4.1
Hadoop version: 2.7.1
Spark master URL: None
Resource manager: Spark standalone
Zeppelin port: 8080

y(es)/n(o) ?
> y

選択したバージョンのアーカイブが自動ダウンロード・展開されたらインストール完了です。

Initiating installation...

[2016-12-01T14:06:10+0900]: Downloading Apache Zeppelin (incubating) from zeppelin-0.6.0-incubating-SNAPSHOT-spark1.4.1-hadoop2.7.1.tar.gz...
####################################################################### 100.0%
[2016-12-01T14:06:17+0900]: Done
[2016-12-01T14:06:17+0900]: Unpack zeppelin-0.6.0-incubating-SNAPSHOT-spark1.4.1-hadoop2.7.1.tar.gz to zeppelin-0.6.0-incubating-SNAPSHOT ...
[2016-12-01T14:06:22+0900]: Done
[2016-12-01T14:06:22+0900]: Downloading Zeppelin configuration file ...
####################################################################### 100.0%
[2016-12-01T14:06:23+0900]: Done
[2016-12-01T14:06:23+0900]: Pyspark found at /python:/python/lib/py4j-0.8.2.1-src.zip
[2016-12-01T14:06:23+0900]: Reading the zeppelin-env.sh
[2016-12-01T14:06:23+0900]: Done

To run Apache Zeppelin (incubating) now do:
./bin/zeppelin-daemon.sh start
and visit https://localhost:8080

いざ起動。

./zeppelin-0.6.0-incubating-SNAPSHOT/bin/zeppelin-daemon.sh start

仮想マシンに割り当てられたパブリックIPへhttpsアクセスし,トップページを表示します。
Create new note をクリックしてノートを作成しましょう。

top

データロードとSQLの実行

今回は,NY市が提供しているタクシー情報の一部 NYC Taxi & Limousine Commission – Trip Record Data を取得し, Spark の DataFrame として扱ってみます。

作成したノートのパラグラフにコードを入力し, Shift+Enter で逐次実行します。

input

1. サンプルデータの取得

NYCタクシー利用者の乗降地点,時間,料金といった情報が含まれたCSVファイルを取得します。

%sh wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2015-01.csv -nv

2. データのロード

DataFrameを作成して一時テーブルとして登録します。
対象CSVファイルはカラム多数のため,参照しない緯度・経度などを省いて定義しています。

import java.sql.Timestamp

val tripText = sc.textFile("green_tripdata_2015-01.csv")

case class Trip(vendorId:Integer, pickupDatetime:Timestamp, dropoffDatetime:Timestamp, passengerCount:Integer, tripDistance:Double, fareAmount:Double, extra:Double, mtaTax:Double, tipAmount:Double, tollsAmount:Double, improvementSurcharge:Double, totalAmount:Double)

val trip = tripText.map(s=>s.split(",")).filter(s=>s(0)!="VendorID").map(
  s=>Trip(s(0).toInt,
    Timestamp.valueOf(s(1)),
    Timestamp.valueOf(s(2)),
    s(9).toInt,
    s(10).toDouble,
    s(11).toDouble,
    s(12).toDouble,
    s(13).toDouble,
    s(14).toDouble,
    s(15).toDouble,
    s(17).toDouble,
    s(18).toDouble
  )
)

trip.toDF().registerTempTable("trip")

データロードを実行。エラーがないことを確認します。

dataload

3. SQLの実行

出力結果の可視化として棒グラフ・円グラフなどが用意されています。
各パラグラフは外部リンクとして共有し,IFrameに表示させるといった使い方ができます。

乗客数でグルーピング

%sql select passengerCount, count(1)
from trip where passengerCount != 0 group by passengerCount

移動距離の平均

%sql select passengerCount, avg(tripDistance)
from trip where passengerCount != 0 group by passengerCount

時間帯ごとの乗客数累計

%sql select hour(pickupDatetime) as HOUR, count(1)
from trip where passengerCount != 0 group by hour(pickupDatetime)

上記の実行結果をグラフ化

graph

4. クロス集計

時間帯と乗客数でクロス集計した結果をグラフに反映してみます。

%sql select
  hour(pickupDatetime) as HOUR
  ,sum(case when passengerCount=1 then 1 else 0 end) as p1
  ,sum(case when passengerCount=2 then 1 else 0 end) as p2
  ,sum(case when passengerCount=3 then 1 else 0 end) as p3
  ,sum(case when passengerCount=4 then 1 else 0 end) as p4
  ,sum(case when passengerCount=5 then 1 else 0 end) as p5
  ,sum(case when passengerCount=6 then 1 else 0 end) as p6
  ,sum(case when passengerCount=7 then 1 else 0 end) as p7
  ,sum(case when passengerCount=8 then 1 else 0 end) as p8
  ,sum(case when passengerCount=9 then 1 else 0 end) as p9
from trip where passengerCount != 0 group by hour(pickupDatetime)

上記クエリの結果。これではさみしい。

cross

対象フィールドを選んでグラフに反映します。

pivot

5. フォームの利用

Dynamic Form を使って入力値をダイアログで指定できます。

%sql select hour(pickupDatetime) as HOUR, count(1)
from trip where passengerCount = ${passenger=2} group by hour(pickupDatetime)

デフォルト値=2 のクエリに対して任意値=5 を入力して再実行。

form

おわりに

エクセルの複雑なピボットテーブルやTableauの華麗なグラフを再現するものではありませんが,分析作業の一助として期待しています。
ノートブックを共有してレビューと改善の履歴を残すほか,本番環境ロード前にデータ傾向を粗めにチェックするなど簡便な使い方ができるのではないでしょうか。

アバター画像
About サイオステクノロジーの中の人 41 Articles
サイオステクノロジーで働く中の人です。
ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。


ご覧いただきありがとうございます。
ブログの最新情報はSNSでも発信しております。
ぜひTwitterのフォロー&Facebookページにいいねをお願い致します!



>> 雑誌等の執筆依頼を受付しております。
   ご希望の方はお気軽にお問い合わせください!

Be the first to comment

Leave a Reply

Your email address will not be published.


*


質問はこちら 閉じる