ラボ 6 - Amazon EMR で Apache Spark を使用してニューヨークのタクシーデータを処理する

このラボでは、Amazon EMR を使用して 2015 年のニューヨークのタクシーデータセットを処理する Spark ジョブを実行します。この完全なデータセットには、ニューヨーク州ニューヨークのタクシー乗車に関する 2009 年~2016 年までのすべてのデータが含まれています。ラボで使用されるデータセットは、2015 年 6 月のデータのサブセットのみです。ラボで処理する csv 形式のデータセットには、約 1 億 2000 万件のレコードがあります。

Spark ジョブは、Spark DataFrames API を活用して、ダーティデータを読み取り、処理します。また、Spark ジョブは、データセットを Amazon S3 バケットに保存する前に、データの形式とデータの整合性を検証します。出力データは、Amazon S3 バケットに同じ形式 (*.csv) で保存されます。処理されたデータは、さらなる分析のために Amazon Redshift にロードされます。

目標

このラボを完了すると、以下の操作を実行できるようになります。

所要時間

このラボの所要時間は約 45 分です。

注意


シナリオ

このラボでは、上記のソリューションのハイライトされたパートを実行します。このラボでは、以下のことを行います。


AWS マネジメントコンソールにアクセスする

このラボの間は、リージョンを変更しないでください。


タスク 1: Amazon S3 バケットに Python スクリプトをアップロードする

このタスクでは、Amazon S3 バケットを作成して、変換されたデータ、およびデータ処理に使用されるスクリプトを保存します。

Amazon S3 出力バケットを作成する

Spark アプリケーションを実行する前に Python が処理するスクリプトと変換データを保持する Amazon S3 バケットを作成します。

Amazon S3 の各バケットには一意の名前が必要になります。そこで、ランダムな数値をバケット名に追加します。

  1. [1][AWS マネジメントコンソール] の [サービス] で [S3] をクリックします。

  2. [2] [バケットを作成する] をクリックします。

  3. [3][バケット名] には、次のように入力します。spark-lab-123 (123 の部分は乱数で置き換え)

  4. [4][作成] をクリックします。

  5. [5] バケット名をクリックします。

  6. [6] [フォルダの作成] をクリックします。

  7. [7] scriptsという名前のフォルダーを作成します。

Amazon S3 バケットに Python スクリプトをアップロードする

Amazon EMR クラスターがアクセスできるように、Amazon S3 バケットに Python スクリプトをアップロードします。

  1. [8] このリンクを右クリックしてファイルをコンピュータにダウンロードします。pyspark-lab6.py

  2. [9][S3 マネジメントコンソール] で [scripts] フォルダをクリックします。

  3. [10] [アップロード] をクリックして、ダウンロードしたばかりのファイルをアップロードします。


タスク 2: Amazon EMR で Spark ジョブを実行する

このタスクでは、Spark ジョブを実行するために 1 つのステップを Amazon EMR クラスターに追加します。Amazon S3 バケットにアップロードされたスクリプトを Spark アプリケーションとして使用します。スクリプトは Python で書かれており、DataFrame API を使用しています。スクリプトはサンプルのデータセットから誤ったデータポイントを除去します。例えば海上を指し示しているデータポイントなどです。スクリプトは Amazon S3 バケットのデータセットを処理し、保持します。

  1. [11][サービス] から [EMR] をクリックします。

  2. [12][クラスター一覧] ページで、[labcluster] をクリックします。

  3. [13][ステップ] タブをクリックしてから [ステップの追加] をクリックします。

  4. [14][ステップの追加] ダイアログボックスで、次のように設定します。

注意 出力ロケーション (s3://spark-lab-123/output) が既に存在する場合、ストリーミングステップは失敗します。この出力フォルダを自分で作成しないでください。もし既に作成してしまった場合は、ステップを追加する前にフォルダを削除してください。

  1. [15][追加] をクリックします。

  2. [16] ジョブのステータスが [完了] になるまで待ちます。 更新を 30 秒ごとにクリックしてステータスを更新できます。

これで、ジョブの出力を表示できるようになりました。

  1. [17][サービス] で [S3] をクリックします。

  2. [18][spark-lab-xxx] バケットをクリックして、[output] フォルダに移動します。

  3. [19][part-] で始まる出力ファイルの 1 つに移動し、そのファイルをコンピュータに [ダウンロード] します。

  4. [20] メモ帳などのテキストエディタでファイルを開きます。時間に余裕がある場合は、他の出力ファイルをダウンロードして開くことができます。

Spark ジョブから変換されたデータは、最も混雑した場所をヒートマップで表すなどの、可視化データを作成するために使用できます。


タスク 3: SSH を使用して Hadoop マスターノードに接続する

SSH クライアントと Amazon EC2 キーペアのプライベートキーを使用して、Amazon EMR マスターノードに接続します。

Amazon EMR クラスターのパブリック DNS 名を取得する

Amazon EMR クラスターのパブリック DNS 名を取得します。

  1. [21][サービス] から [EMR] をクリックします。

  2. [22][クラスター一覧] ページで、[labcluster] をクリックします。

  3. [23] マスターパブリック DNS 名をクリップボードにコピーします。後で使用できるように、テキストエディタに貼り付けます。

Windows ユーザー: SSH を使って接続する

この手順は Windows ユーザーのみを対象としています。

Mac または Linux を使用している場合は、このセクションを省略して次のセクションに進んでください

  1. [24] ラボの開始ボンタンを押した画面の左側にある、[ PPK形式でダウンロード] をクリックします。

  2. [25] 保存場所を尋ねられた場合は任意のディレクトリを指定して、ファイルを保存します。

PuTTY を使用して Amazon EC2 インスタンスに SSH で接続します。

コンピュータに PuTTY がインストールされていない場合は、ここからダウンロードしてください。

  1. [26] PuTTY.exe を開きます。

  2. [27] 以下のようにして、PuTTY がタイムアウトしないように設定します。

これで、PuTTY セッションを長時間確立したままにできます。

  1. [28] PuTTY セッションの設定を行います。
  1. [29] ユーザー名の入力を求められたら、hadoopと入力します。

これは、EC2 スタンスで使用される通常の ec 2-user ログインとは異なります。

これにより、EC2 インスタンスに接続されます。

Windows ユーザーはこちらをクリックして次のタスクに進んでください。

Mac および Linux ユーザー

この手順は Mac または Linux ユーザーのみを対象としています。Windows を使用している場合は、次のタスクに進んでください。

  1. [30] ラボの開始ボンタンを押した画面の左側にある、[ PEM形式でダウンロード] をクリックします。

  2. [31] ディレクトリを指定してファイルを保存します。

  3. [32] 以下のコマンドをテキストエディタにコピーします。

chmod 400 KEYPAIR.pem ssh -i KEYPAIR.pem hadoop@MASTER-PUBLIC-DNS
  1. [33] コマンドを編集します。

以下に例を示します。

chmod 400 Downloads/qwikLABS-L123-1234.pem ssh -i Downloads/qwikLABS-L123-1234.pem hadoop@ec2-54-111-222-333.us-west-2.compute.amazonaws.com
  1. [34] 編集したコマンドをターミナルウィンドウに貼り付けて実行します。

  2. [35] リモート SSH サーバーへの最初の接続を許可するかを確認するメッセージが表示されたら、yesと入力します。

認証にキーペアを使用しているため、パスワードの入力は要求されません。


タスク 4: Spark を使用してニューヨークのタクシーデータをクエリする

このタスクでは、Amazon Redshift に 2015 年 6 月のニューヨークのタクシーデータをロードします。

必要な JAR ファイルをダウンロードして、spark シェルにログインします。

  1. [36] 次のようなメッセージは、マスターノードへの接続が成功したことを示します。
__| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
  1. [37] 必要な JAR ファイルをダウンロードするには、次のコマンドを実行します。JAR ファイルは spark シェルの開始時にインポートされます。
aws s3 cp s3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-6-spark/scripts/jar/ . --recursive
  1. [38] 次のコマンドを実行して [spark-shell] を起動します。
spark-shell --jars minimal-json-0.9.5-SNAPSHOT.jar,/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,spark-avro_2.11-3.0.1.jar,spark-redshift_2.11-2.0.1.jar

1 分後に [scala] プロンプトが表示されます。

Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala>

Spark-Redshift 接続のための AWS 認証情報を作成する

このセクションでは、Spark-Redshift データ転送のための一時的な AWS 認証情報を作成します。

  1. [39] spark-shell では、以下のコマンドをコピーしてインポートライブラリーに貼り付けます。
import org.apache.spark.sql._ import com.amazonaws.auth._
  1. [40] spark-shell で、以下のコマンドをコピーして貼り付け、IAM インスタンスプロファイルのための AWS 認証情報を作成します。
val provider = new InstanceProfileCredentialsProvider() val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]

Output:

provider: com.amazonaws.auth.InstanceProfileCredentialsProvider = InstanceProfileCredentialsProvider credentials: com.amazonaws.auth.AWSSessionCredentials = com.amazonaws.auth.BasicSessionCredentials@

InstanceProfileCredentialsProvider.getCredentials では、発信者が AWS リクエストを認証するために使用できる AWSCredentials を返します。AWSCredentialsProvider の各実装では、認証情報をロードするための独自の戦略を選択できます。例えば、既存のキー管理システムから認証情報をロードする実装もあれば、認証情報のローテーションがなされたときに新しい認証情報をロードする実装もあります。

  1. [41] spark-shell で、以下のコマンドを実行して、AWS トークン、アクセスキーとシークレットキーを作成します。
val token = credentials.getSessionToken val awsAccessKey = credentials.getAWSAccessKeyId val awsSecretKey = credentials.getAWSSecretKey

Output:

token: String = <Personal Token> awsAccessKey: String = <Personal Key> awsSecretKey: String = <Personal Secret Key>

Spark シェルを使用して処理されたデータを調べる

このセクションでは、処理された 2015 年 6 月のデータを Spark DataFrame にロードして、タクシー乗車回数の合計と乗客数に基づく平均チップ金額を導き出すクエリを行います。

  1. [42] spark-shell で、次のコマンドを実行して、Amazon S3 のロケーションパスの変数を作成します。
val s3_location = "s3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-6-spark/processed_data.csv"
  1. [43] 以下のコマンドを実行して、指定した S3 のロケーションから csv を読み込んで DataFrame を作成します。
val df = spark.read.option("header","true").option("inferSchema","true").csv(s3_location)

このステップは数分で完了します。

DataFrame は複数の列によって構成されたデータの分散型コレクションです。リレーショナルデータベースにおけるテーブル、R や Python のデータフレームと概念的には同じものですが、実装内部ではよりリッチな最適化が行われています。

  1. [44] 次のコマンドを実行して、ロードされるデータのスキーマについて説明します。
df.printSchema()
  1. [45] 以下のコマンドを実行して、乗客数に基づく平均チップ金額を検索します。
df.filter(df("tip_amount") > 0).groupBy("passenger_count").agg(mean("tip_amount")).sort("passenger_count").show()

このステップは数分で完了します。

  1. [46] 次のコマンドを実行して、乗客の数に基づいてタクシーの乗車数を検索します。
df.groupBy("passenger_count").count().sort("passenger_count").show()

タスク 5: Amazon EMR の Spark を使用して Amazon Redshift にデータを転送する

このセクションでは、spark-redshift コネクタを使用して、DataFrame から Amazon Redshift クラスターにデータを転送します。

まず、Amazon Redshift クラスターの JDBC URL を取得します。

AWS マネジメントコンソールで、[サービス] から [Redshift] をクリックします。

  1. [48] 左側のナビゲーションペインで [クラスター] をクリックします。

  2. [49] qls で始まるクラスターのリンクをクリックします。

  3. [50][Configuration] タブで [Cluster Database Properties] までスクロールします。

    Redshift の新インタフェースを用いている場合は、[プロパティ] タブをクリックし、「接続の詳細」 から 「すべての接続の詳細を表示します」をクリックします。

  4. [51][JDBC URL] の値をテキストエディタにコピーします。

  5. [52] SSH セッションで次のコマンドを実行し、[JDBC_URL を Amazon Redshift コンソールからコピーした JDBC URL に置き換え] します。

val jdbc_url ="JDBC_URL"+"?user=master&password=Redshift123"
  1. [53] 次のコマンドを実行し、[spark-lab-123 をお使いの Amazon S3 バケット名に置き換え] します。
val temp_dir = "s3://spark-lab-123/temp/"
  1. [54] 次のコマンドを実行し、Redshift クラスターに DataFrame を書き込みます。
df.write.format("com.databricks.spark.redshift").option("url", jdbc_url).option("dbtable","nytaxi").option("tempdir",temp_dir).option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token",token).save()

データが Amazon Redshift にロードされる間に次の手順に進みます。

  1. [55] Redshift コンソールで、[Loads] タブをクリックします。

    Redshift の新UIを用いている場合は、[クエリのモニタリング] タブの 「Queries and loads」から ロード を選択します。

ステータスが [COMPLETED] に変わるまで待ちます。所要時間は約 7 分です。


タスク 6: Amazon Redshift のデータに対してクエリを実行する

データは Redshift クラスターにエクスポートされたので、通常の SQL コマンドを使用してクエリすることができます。

Redshift への接続には [pgweb] を使用します。

お好みの SQL クライアントがコンピュータに既にインストールされている場合は、[pgweb] の代わりにその SQL クライアントを使用できます。ただし、このラボの手順は、[pgweb] インターフェイスに基づいています。

  1. [56] Qwiklab画面の左側に表示されている [pgweb] IP アドレスをコピーします。

これは、pgweb ソフトウェアを実行しているウェブサーバーの IP アドレスです。

ウェブブラウザで新しいタブを開き、IP アドレスを貼り付けて Enter キーを押します。

pgweb のログイン画面が表示されます。

  1. [58] 以下のように設定します。

[Port] の値を変更できない場合は、SSL無効にしてからやり直してください。

データをクエリする

これで、[nytaxi] テーブルに保存されたデータをクエリすることができます。

  1. [59] 次のコマンドを実行して、テーブル内の行数をカウントします。
SELECT count(*) FROM nytaxi;

1,200 万行以上のデータがあります。

  1. [60] 次のコマンドを実行して、データのサンプルを表示します。
SELECT * FROM nytaxi LIMIT 10;

Spark で以前に実行したものと同様のクエリを実行できるようになりました。

  1. [61] 以下のクエリを実行して、乗客数に基づいて平均チップ金額を検索します。
SELECT passenger_count, AVG(tip_amount) FROM nytaxi WHERE tip_amount > 0 GROUP BY passenger_count ORDER BY passenger_count;

結果を Spark コマンドの出力と比較します。この時点ではまだ SSH ウィンドウに表示されます。

  1. [62] 以下のクエリを実行して、乗客の数に基づいてタクシーの乗車数を検索します。
SELECT passenger_count, COUNT(*) FROM nytaxi GROUP BY passenger_count ORDER BY passenger_count;

結果を Spark コマンドの出力と比較します。この時点ではまだ SSH ウィンドウに表示されます。

Amazon Redshift は、Spark よりも速く計算しますが、データをクラスターにロードするのに余分な時間が掛かります。


ラボを終了する

お疲れ様でした。 このラボを完了しました。以下の手順に従って、ラボ環境をクリーンアップします。

  1. [63] AWS マネジメントコンソールからサインアウトするには、コンソール上部のメニューバーで [awsstudent] をクリックし、[サインアウト] をクリックします。

  2. [64] Qwiklabs ページで [ラボを終了] をクリックします。