はじめに
サイオステクノロジー技術部の森田です。
Webベースの分析向けノートブックとして, Apache Zeppelin を紹介します。
お手元に仮想マシンのひとつでもあれば,Zeppelinと組み込みSparkを使って簡単にデータ集計・可視化できる内容となっています。
Apache Zeppelin
Spark/Hadoop といった分散処理システムに対してコードを実行し,実行結果をグラフとして描画することができます。インタプリタが豊富で,シェル/SQL/scala/python … と分析で必要とされるインタフェースが揃っており,自分でインタプリタを作成することも可能です。
記録されたコードと実行結果を共有できるため,複数人による共同作業に活用しやすいと言えるでしょう。
本記事では,Spark環境なしの前提ですすめます。
動作環境
Microsoft Azure の仮想マシンを用います。
以下のスペックで作成し,ネットワークセキュリティグループに受信セキュリティ規則(ssh/https許可)を追加しておきます。
- ディストリビューションはCentOS 7.2
- インスタンスタイプはStandard / 4コア / 28GB RAM
ちなみにAzureは 料金計算ツール を使えば1時間単位のコスト算出も朝飯前。 Azure ポータル ではダッシュボードでリソースモニタリングも楽々であります。
閑話休題。作成した仮想マシンにログインして,以下を適用してゆきましょう。
ポートフォワードはお好みで行います。
パッケージ更新
sudo yum update sudo yum upgrade
タイムゾーンを Asia/Tokyo に変更
sudo ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
SE Linux を無効に
sudo setenforce 0 sudo vi /etc/selinux/config -- SELINUX=disabled
Oracle JDKをインストール
curl -OL --header "Cookie: gpw_e24=https%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u112-b15/jdk-8u112-linux-x64.rpm sudo rpm -ivh jdk-8u112-linux-x64.rpm sudo touch /etc/profile.d/jdk.sh vi /etc/profile.d/jdk.sh -- export JAVA_HOME=/usr/java/default source /etc/profile.d/jdk.sh
(Optional) firewalldを有効にしてポート80を8080へフォワード
sudo firewall-cmd --state sudo systemctl enable firewalld sudo systemctl start firewalld sudo systemctl status firewalld sudo firewall-cmd --add-service=https --zone=public --permanent sudo firewall-cmd --zone="public" --add-forward-port=port=80:proto=tcp:toport=8080 --permanent sudo firewall-cmd --zone="public" --add-interface=eth0 --permanent sudo firewall-cmd --reload sudo firewall-cmd --get-active-zones sudo firewall-cmd --zone=public --list-all
インストール
今回は Z-Manager という簡易インストーラを用いて Zeppelin をインストールしてみましょう。
Zeppelinをインストールしたいディレクトリで Z-Manager のサイトに記載のあるワンライナーを実行します。
curl -fsSL https://raw.githubusercontent.com/NFLabs/z-manager/master/zeppelin-installer.sh | bash
インタラクティブUI(シェル)で Sparkバージョンの選択や構築済み環境を指定します。Sparkスタンドアロンにより,Zeppelinを試すハードルを下げてくれます。
以下の例ではアドバンスドモードで任意バージョンのSparkをインストールしました (Z-Managerで用意されている最新)。
Welcome to Z-Manager! Select the type of Zeppelin installation: 1. Default (Hadoop 2.4.0; Spark 1.3.1; local mode) Good for quickstart: does not require local Spark installation does not require external Spark cluster 2. Advanced (pick versions and adjust configuration to your cluster) > 2 Please select one of the following Spark releases 1. Spark 1.3.1 2. Spark 1.3.0 3. Spark 1.4.0 4. Spark 1.4.1 (latest) > 4 Please select one of the supported Hadoop releases 1. Hadoop 1.0.4 2. Hadoop 2.7.1 > 2
既存Sparkクラスタを使いたい場合はYesを選択後,次の質問で参照先のパスを指定します。
Do you want to configure external Spark cluster? y(es)/n(o) > n
Zeppelin起動ポートのデフォルトは8080です。
尚,仮想マシン作成時にfirewalldを有効にしてポート80での接続を可能にしています。
Please enter Zeppelin port number: 8080
インストール前に設定項目の確認を求められます。
Configuration is finished now, please review the options you have chosen before installation begins: Spark version: 1.4.1 Hadoop version: 2.7.1 Spark master URL: None Resource manager: Spark standalone Zeppelin port: 8080 y(es)/n(o) ? > y
選択したバージョンのアーカイブが自動ダウンロード・展開されたらインストール完了です。
Initiating installation... [2016-12-01T14:06:10+0900]: Downloading Apache Zeppelin (incubating) from zeppelin-0.6.0-incubating-SNAPSHOT-spark1.4.1-hadoop2.7.1.tar.gz... ####################################################################### 100.0% [2016-12-01T14:06:17+0900]: Done [2016-12-01T14:06:17+0900]: Unpack zeppelin-0.6.0-incubating-SNAPSHOT-spark1.4.1-hadoop2.7.1.tar.gz to zeppelin-0.6.0-incubating-SNAPSHOT ... [2016-12-01T14:06:22+0900]: Done [2016-12-01T14:06:22+0900]: Downloading Zeppelin configuration file ... ####################################################################### 100.0% [2016-12-01T14:06:23+0900]: Done [2016-12-01T14:06:23+0900]: Pyspark found at /python:/python/lib/py4j-0.8.2.1-src.zip [2016-12-01T14:06:23+0900]: Reading the zeppelin-env.sh [2016-12-01T14:06:23+0900]: Done To run Apache Zeppelin (incubating) now do: ./bin/zeppelin-daemon.sh start and visit https://localhost:8080
いざ起動。
./zeppelin-0.6.0-incubating-SNAPSHOT/bin/zeppelin-daemon.sh start
仮想マシンに割り当てられたパブリックIPへhttpsアクセスし,トップページを表示します。
Create new note
をクリックしてノートを作成しましょう。
データロードとSQLの実行
今回は,NY市が提供しているタクシー情報の一部 NYC Taxi & Limousine Commission – Trip Record Data を取得し, Spark の DataFrame として扱ってみます。
作成したノートのパラグラフにコードを入力し, Shift+Enter
で逐次実行します。
1. サンプルデータの取得
NYCタクシー利用者の乗降地点,時間,料金といった情報が含まれたCSVファイルを取得します。
%sh wget https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2015-01.csv -nv
2. データのロード
DataFrameを作成して一時テーブルとして登録します。
対象CSVファイルはカラム多数のため,参照しない緯度・経度などを省いて定義しています。
import java.sql.Timestamp val tripText = sc.textFile("green_tripdata_2015-01.csv") case class Trip(vendorId:Integer, pickupDatetime:Timestamp, dropoffDatetime:Timestamp, passengerCount:Integer, tripDistance:Double, fareAmount:Double, extra:Double, mtaTax:Double, tipAmount:Double, tollsAmount:Double, improvementSurcharge:Double, totalAmount:Double) val trip = tripText.map(s=>s.split(",")).filter(s=>s(0)!="VendorID").map( s=>Trip(s(0).toInt, Timestamp.valueOf(s(1)), Timestamp.valueOf(s(2)), s(9).toInt, s(10).toDouble, s(11).toDouble, s(12).toDouble, s(13).toDouble, s(14).toDouble, s(15).toDouble, s(17).toDouble, s(18).toDouble ) ) trip.toDF().registerTempTable("trip")
データロードを実行。エラーがないことを確認します。
3. SQLの実行
出力結果の可視化として棒グラフ・円グラフなどが用意されています。
各パラグラフは外部リンクとして共有し,IFrameに表示させるといった使い方ができます。
乗客数でグルーピング
%sql select passengerCount, count(1) from trip where passengerCount != 0 group by passengerCount
移動距離の平均
%sql select passengerCount, avg(tripDistance) from trip where passengerCount != 0 group by passengerCount
時間帯ごとの乗客数累計
%sql select hour(pickupDatetime) as HOUR, count(1) from trip where passengerCount != 0 group by hour(pickupDatetime)
上記の実行結果をグラフ化
4. クロス集計
時間帯と乗客数でクロス集計した結果をグラフに反映してみます。
%sql select hour(pickupDatetime) as HOUR ,sum(case when passengerCount=1 then 1 else 0 end) as p1 ,sum(case when passengerCount=2 then 1 else 0 end) as p2 ,sum(case when passengerCount=3 then 1 else 0 end) as p3 ,sum(case when passengerCount=4 then 1 else 0 end) as p4 ,sum(case when passengerCount=5 then 1 else 0 end) as p5 ,sum(case when passengerCount=6 then 1 else 0 end) as p6 ,sum(case when passengerCount=7 then 1 else 0 end) as p7 ,sum(case when passengerCount=8 then 1 else 0 end) as p8 ,sum(case when passengerCount=9 then 1 else 0 end) as p9 from trip where passengerCount != 0 group by hour(pickupDatetime)
上記クエリの結果。これではさみしい。
対象フィールドを選んでグラフに反映します。
5. フォームの利用
Dynamic Form を使って入力値をダイアログで指定できます。
%sql select hour(pickupDatetime) as HOUR, count(1) from trip where passengerCount = ${passenger=2} group by hour(pickupDatetime)
デフォルト値=2 のクエリに対して任意値=5 を入力して再実行。
おわりに
エクセルの複雑なピボットテーブルやTableauの華麗なグラフを再現するものではありませんが,分析作業の一助として期待しています。
ノートブックを共有してレビューと改善の履歴を残すほか,本番環境ロード前にデータ傾向を粗めにチェックするなど簡便な使い方ができるのではないでしょうか。