このラボでは、Amazon EMR を使用して 2015 年のニューヨークのタクシーデータセットを処理する Spark ジョブを実行します。この完全なデータセットには、ニューヨーク州ニューヨークのタクシー乗車に関する 2009 年~2016 年までのすべてのデータが含まれています。ラボで使用されるデータセットは、2015 年 6 月のデータのサブセットのみです。ラボで処理する csv 形式のデータセットには、約 1 億 2000 万件のレコードがあります。
Spark ジョブは、Spark DataFrames API を活用して、ダーティデータを読み取り、処理します。また、Spark ジョブは、データセットを Amazon S3 バケットに保存する前に、データの形式とデータの整合性を検証します。出力データは、Amazon S3 バケットに同じ形式 (*.csv) で保存されます。処理されたデータは、さらなる分析のために Amazon Redshift にロードされます。
目標
このラボを完了すると、以下の操作を実行できるようになります。
所要時間
このラボの所要時間は約 45 分です。
注意
シナリオ

このラボでは、上記のソリューションのハイライトされたパートを実行します。このラボでは、以下のことを行います。
Qwiklabs画面の右にある [ラボを開始] をクリックして、ラボを起動します。
Qwiklabs画面の左側に表示されている[コンソールを開く] をクリックします。
Qwiklabs画面の左側に表示されている認証情報を使用して AWS マネジメントコンソールにサインインします。
このラボの間は、リージョンを変更しないでください。
このタスクでは、Amazon S3 バケットを作成して、変換されたデータ、およびデータ処理に使用されるスクリプトを保存します。
Spark アプリケーションを実行する前に Python が処理するスクリプトと変換データを保持する Amazon S3 バケットを作成します。
Amazon S3 の各バケットには一意の名前が必要になります。そこで、ランダムな数値をバケット名に追加します。
[1][AWS マネジメントコンソール] の [サービス] で [S3] をクリックします。
[2] [バケットを作成する] をクリックします。
[3][バケット名] には、次のように入力します。spark-lab-123 (123 の部分は乱数で置き換え)
[4][作成] をクリックします。
[5] バケット名をクリックします。
[6] [フォルダの作成] をクリックします。
[7] scriptsという名前のフォルダーを作成します。
Amazon EMR クラスターがアクセスできるように、Amazon S3 バケットに Python スクリプトをアップロードします。
[8] このリンクを右クリックしてファイルをコンピュータにダウンロードします。pyspark-lab6.py
[9][S3 マネジメントコンソール] で [scripts] フォルダをクリックします。
[10] [アップロード] をクリックして、ダウンロードしたばかりのファイルをアップロードします。
このタスクでは、Spark ジョブを実行するために 1 つのステップを Amazon EMR クラスターに追加します。Amazon S3 バケットにアップロードされたスクリプトを Spark アプリケーションとして使用します。スクリプトは Python で書かれており、DataFrame API を使用しています。スクリプトはサンプルのデータセットから誤ったデータポイントを除去します。例えば海上を指し示しているデータポイントなどです。スクリプトは Amazon S3 バケットのデータセットを処理し、保持します。
[11][サービス] から [EMR] をクリックします。
[12][クラスター一覧] ページで、[labcluster] をクリックします。
[13][ステップ] タブをクリックしてから [ステップの追加] をクリックします。
[14][ステップの追加] ダイアログボックスで、次のように設定します。
New_York_Taxi_Preprocesss3://spark-lab-123/scripts/pyspark-lab6.py
s3://spark-lab-123/output
注意 出力ロケーション (s3://spark-lab-123/output) が既に存在する場合、ストリーミングステップは失敗します。この出力フォルダを自分で作成しないでください。もし既に作成してしまった場合は、ステップを追加する前にフォルダを削除してください。
[15][追加] をクリックします。
[16] ジョブのステータスが [完了] になるまで待ちます。 更新を 30 秒ごとにクリックしてステータスを更新できます。
これで、ジョブの出力を表示できるようになりました。
[17][サービス] で [S3] をクリックします。
[18][spark-lab-xxx] バケットをクリックして、[output] フォルダに移動します。
[19][part-] で始まる出力ファイルの 1 つに移動し、そのファイルをコンピュータに [ダウンロード] します。
[20] メモ帳などのテキストエディタでファイルを開きます。時間に余裕がある場合は、他の出力ファイルをダウンロードして開くことができます。
Spark ジョブから変換されたデータは、最も混雑した場所をヒートマップで表すなどの、可視化データを作成するために使用できます。
SSH クライアントと Amazon EC2 キーペアのプライベートキーを使用して、Amazon EMR マスターノードに接続します。
Amazon EMR クラスターのパブリック DNS 名を取得します。
[21][サービス] から [EMR] をクリックします。
[22][クラスター一覧] ページで、[labcluster] をクリックします。
[23] マスターパブリック DNS 名をクリップボードにコピーします。後で使用できるように、テキストエディタに貼り付けます。
この手順は Windows ユーザーのみを対象としています。
Mac または Linux を使用している場合は、このセクションを省略して次のセクションに進んでください。
[24] ラボの開始ボンタンを押した画面の左側にある、[ PPK形式でダウンロード] をクリックします。
[25] 保存場所を尋ねられた場合は任意のディレクトリを指定して、ファイルを保存します。
PuTTY を使用して Amazon EC2 インスタンスに SSH で接続します。
コンピュータに PuTTY がインストールされていない場合は、ここからダウンロードしてください。
[26] PuTTY.exe を開きます。
[27] 以下のようにして、PuTTY がタイムアウトしないように設定します。
これで、PuTTY セッションを長時間確立したままにできます。
hadoopと入力します。これは、EC2 スタンスで使用される通常の ec 2-user ログインとは異なります。
これにより、EC2 インスタンスに接続されます。
Windows ユーザーはこちらをクリックして次のタスクに進んでください。
この手順は Mac または Linux ユーザーのみを対象としています。Windows を使用している場合は、次のタスクに進んでください。
[30] ラボの開始ボンタンを押した画面の左側にある、[ PEM形式でダウンロード] をクリックします。
[31] ディレクトリを指定してファイルを保存します。
[32] 以下のコマンドをテキストエディタにコピーします。
chmod 400 KEYPAIR.pem
ssh -i KEYPAIR.pem hadoop@MASTER-PUBLIC-DNS
以下に例を示します。
chmod 400 Downloads/qwikLABS-L123-1234.pem
ssh -i Downloads/qwikLABS-L123-1234.pem hadoop@ec2-54-111-222-333.us-west-2.compute.amazonaws.com
[34] 編集したコマンドをターミナルウィンドウに貼り付けて実行します。
[35] リモート SSH サーバーへの最初の接続を許可するかを確認するメッセージが表示されたら、yesと入力します。
認証にキーペアを使用しているため、パスワードの入力は要求されません。
このタスクでは、Amazon Redshift に 2015 年 6 月のニューヨークのタクシーデータをロードします。
必要な JAR ファイルをダウンロードして、spark シェルにログインします。
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
aws s3 cp s3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-6-spark/scripts/jar/ . --recursive
spark-shell --jars minimal-json-0.9.5-SNAPSHOT.jar,/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,spark-avro_2.11-3.0.1.jar,spark-redshift_2.11-2.0.1.jar
1 分後に [scala] プロンプトが表示されます。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
このセクションでは、Spark-Redshift データ転送のための一時的な AWS 認証情報を作成します。
import org.apache.spark.sql._
import com.amazonaws.auth._
val provider = new InstanceProfileCredentialsProvider()
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]
Output:
provider: com.amazonaws.auth.InstanceProfileCredentialsProvider = InstanceProfileCredentialsProvider credentials: com.amazonaws.auth.AWSSessionCredentials = com.amazonaws.auth.BasicSessionCredentials@
InstanceProfileCredentialsProvider.getCredentials では、発信者が AWS リクエストを認証するために使用できる AWSCredentials を返します。AWSCredentialsProvider の各実装では、認証情報をロードするための独自の戦略を選択できます。例えば、既存のキー管理システムから認証情報をロードする実装もあれば、認証情報のローテーションがなされたときに新しい認証情報をロードする実装もあります。
val token = credentials.getSessionToken
val awsAccessKey = credentials.getAWSAccessKeyId
val awsSecretKey = credentials.getAWSSecretKey
Output:
token: String = <Personal Token> awsAccessKey: String = <Personal Key> awsSecretKey: String = <Personal Secret Key>
このセクションでは、処理された 2015 年 6 月のデータを Spark DataFrame にロードして、タクシー乗車回数の合計と乗客数に基づく平均チップ金額を導き出すクエリを行います。
val s3_location = "s3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-6-spark/processed_data.csv"
val df = spark.read.option("header","true").option("inferSchema","true").csv(s3_location)
このステップは数分で完了します。
DataFrame は複数の列によって構成されたデータの分散型コレクションです。リレーショナルデータベースにおけるテーブル、R や Python のデータフレームと概念的には同じものですが、実装内部ではよりリッチな最適化が行われています。
df.printSchema()
df.filter(df("tip_amount") > 0).groupBy("passenger_count").agg(mean("tip_amount")).sort("passenger_count").show()
このステップは数分で完了します。
df.groupBy("passenger_count").count().sort("passenger_count").show()
このセクションでは、spark-redshift コネクタを使用して、DataFrame から Amazon Redshift クラスターにデータを転送します。
まず、Amazon Redshift クラスターの JDBC URL を取得します。
AWS マネジメントコンソールで、[サービス] から [Redshift] をクリックします。
[48] 左側のナビゲーションペインで [クラスター] をクリックします。
[49] qls で始まるクラスターのリンクをクリックします。
[50][Configuration] タブで [Cluster Database Properties] までスクロールします。
Redshift の新インタフェースを用いている場合は、[プロパティ] タブをクリックし、「接続の詳細」 から 「すべての接続の詳細を表示します」をクリックします。
[51][JDBC URL] の値をテキストエディタにコピーします。
[52] SSH セッションで次のコマンドを実行し、[JDBC_URL を Amazon Redshift コンソールからコピーした JDBC URL に置き換え] します。
val jdbc_url ="JDBC_URL"+"?user=master&password=Redshift123"
val temp_dir = "s3://spark-lab-123/temp/"
df.write.format("com.databricks.spark.redshift").option("url", jdbc_url).option("dbtable","nytaxi").option("tempdir",temp_dir).option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token",token).save()
データが Amazon Redshift にロードされる間に次の手順に進みます。
[55] Redshift コンソールで、[Loads] タブをクリックします。
Redshift の新UIを用いている場合は、[クエリのモニタリング] タブの 「Queries and loads」から ロード を選択します。
ステータスが [COMPLETED] に変わるまで待ちます。所要時間は約 7 分です。
データは Redshift クラスターにエクスポートされたので、通常の SQL コマンドを使用してクエリすることができます。
Redshift への接続には [pgweb] を使用します。
お好みの SQL クライアントがコンピュータに既にインストールされている場合は、[pgweb] の代わりにその SQL クライアントを使用できます。ただし、このラボの手順は、[pgweb] インターフェイスに基づいています。
これは、pgweb ソフトウェアを実行しているウェブサーバーの IP アドレスです。
ウェブブラウザで新しいタブを開き、IP アドレスを貼り付けて Enter キーを押します。
pgweb のログイン画面が表示されます。
masterRedshift123dev5439 (デフォルトの値とは異なります)[Port] の値を変更できない場合は、SSL を無効にしてからやり直してください。
これで、[nytaxi] テーブルに保存されたデータをクエリすることができます。
SELECT count(*) FROM nytaxi;
1,200 万行以上のデータがあります。
SELECT * FROM nytaxi LIMIT 10;
Spark で以前に実行したものと同様のクエリを実行できるようになりました。
SELECT
passenger_count,
AVG(tip_amount)
FROM nytaxi
WHERE
tip_amount > 0
GROUP BY passenger_count
ORDER BY passenger_count;
結果を Spark コマンドの出力と比較します。この時点ではまだ SSH ウィンドウに表示されます。
SELECT
passenger_count,
COUNT(*)
FROM nytaxi
GROUP BY passenger_count
ORDER BY passenger_count;
結果を Spark コマンドの出力と比較します。この時点ではまだ SSH ウィンドウに表示されます。
Amazon Redshift は、Spark よりも速く計算しますが、データをクラスターにロードするのに余分な時間が掛かります。
お疲れ様でした。 このラボを完了しました。以下の手順に従って、ラボ環境をクリーンアップします。
[63] AWS マネジメントコンソールからサインアウトするには、コンソール上部のメニューバーで [awsstudent] をクリックし、[サインアウト] をクリックします。
[64] Qwiklabs ページで [ラボを終了] をクリックします。