このラボでは、Amazon EMR クラスターで Hive プログラミングフレームワークを使用して、Amazon S3 データの Amazon DynamoDB へのインポート、DynamoDB テーブルの Amazon S3 へのエクスポート、および Amazon DynamoDB と Amazon S3 に保存されているテーブルに対するクエリを実行します。
このラボで使用する Amazon EMR クラスターは、既に起動されていて使用できるようになっています。Amazon EMR および Hive の詳細については、このコース内で後ほど説明します。
目標
このラボを完了すると、以下の操作を実行できるようになります。
所要時間
このラボは、修了までに30 分かかります。
注意
Qwiklabs画面にある [ラボを開始] をクリックして、ラボを起動します。
Qwiklabs画面の左側に表示されている[コンソールを開く] をクリックします。
Qwiklabs画面左側に表示されている認証情報を使用して AWS マネジメントコンソールにサインインします。
このラボの間は、リージョンを変更しないでください。
ラボのこのセクションでは、DynamoDB テーブルを作成し、Amazon EMR や Hive を使用してそのテーブルにデータを入力します。
Amazon DynamoDB の概要
Apache Hadoop データベースと NoSQL データベースとは、相互補完的な技術であり、一緒に使用することでビッグデータを管理、分析、および収益化できる強力なツールボックスとなります。Amazon EMR は Amazon DynamoDB と直接統合されており、しばしば法外な値段になる管理、メンテナンス費用、および初期ハードウェア費用を削減できる統合ソリューションを顧客に提供します。
Amazon EMR の高度な並列環境を使用し、必要な数のサーバーに作業を分散することで、Amazon DynamoDB の内外へ膨大なデータを移動したり、そのデータに対して高度な分析を実行したりすることができるようになりました。
また Amazon EMR では Hive と呼ばれる、SQL ライクな Hadoop 向けエンジンが使用されるため、基本的な SQL を知ってさえいれば、ハッシュキーに基づいた最適なデータ分割の予想、DynamoDB への適切なフィルターのプッシュ、および Amazon EMR クラスター内の全インスタンスへのタスク分散など、分散アプリケーションの複雑な作業を Amazon EMR を使用して処理できます。
このラボでは、Amazon EMR を使用して、Amazon S3 データの Amazon DynamoDB へのインポート、DynamoDB テーブルの Amazon S3 へのエクスポート、および DynamoDB と Amazon S3 に保存されているテーブルに対するクエリを実行します。
また、Amazon S3 に保存されているサンプル製品の注文データを使用して、最新のデータを Amazon DynamoDB に保管すると同時に、頻繁にアクセスしなくなった古いデータを Amazon S3 に保存する演習も行います。あまり使用しないデータを Amazon S3 にエクスポートすることで、ストレージコストを削減できるだけでなく、高速データに必要な低レイテンシーアクセスを維持できます。さらに、Amazon S3 にエクスポートされたデータは Amazon EMR で直接クエリ可能です (また、エクスポートしたテーブルは DynamoDB テーブルに結合することもできます)。
使用する DynamoDB テーブル
使用するサンプル注文データには、次のスキーマが設定されています。このデータには、プライマリキーとしての Order ID、Customer ID フィールド、エポック以後の秒数として保存される Order Date、および各注文で顧客が購入する合計金額を表す Total が含まれています。また、このデータには、年ごとおよび月ごとのフォルダによるパーティションもあります。
Orders | Type |
---|---|
Order_ID | String (Primary Key) |
Customer_ID | String |
Order_Date | Bigint |
Total | Double |
2012 年 1 月の DynamoDB テーブル、Orders-2012-01 を作成します。Order_ID がプライマリキーです。月ごとに新しいテーブルを使用することで、低レイテンシーアクセスを必要としなくなったテーブルのデータをエクスポートして、そのようなテーブルを削除することが簡単になります。
[1][AWS マネジメントコンソール] の [サービス] で [DynamoDB] をクリックします。
[2][テーブルの作成] をクリックします。
[3][DynamoDB テーブルの作成] ページで、次の操作を行います。
Orders-2012-01
Order_ID
(文字列)追加のオプションが表示されます。Auto Scaling を使用するのではなく、特定の容量を設定します。
[5][Auto Scaling] セクションで、[読み込みキャパシティー] と [書き込みキャパシティー] をオフにします。
[6][Provisioned capacity] セクションで次のように入力します。
読み込みキャパシティーユニット: 100
書き込みキャパシティーユニット: 100
このサンプルでは、100 ユニットの読み込み容量と書き込み容量があれば十分に足ります。これらの値を設定するときは、Amazon EMR クラスターが大きいほど、それで使用できる容量も大きくなることに留意します。また、この DynamoDB テーブルを利用する他のアプリケーションとの間でこの容量を共有できます。
[Auto Scaling] を活用して、Amazon DynamoDB に、アプリケーショントラフィックの変化に応じて、スループットを自動的に調整させることもできます。詳細については、こちらをご確認ください。[DynamoDB Auto Scaling によるスループットキャパシティの自動管理] http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AutoScaling.html )
DynamoDB テーブルは、ラボの後半で使用します。
ラボのこのセクションでは、Hive クラスターで使用する出力バケットを Amazon S3 に作成します。
Amazon S3 の各バケットには一意の名前が必要になります。そこで、ランダムな数値をバケット名に追加します。
[8][サービス] で [S3] をクリックします。
[9] [バケットを作成する] をクリックします。
[10][バケット名] には、次のように入力します。dynamodb-bucket-123
(123 の部分は乱数で置き換え)
リージョンは変更しません。
SSH クライアントと Amazon EC2 キーペアのプライベートキーを使用して、Amazon EMR マスターノードに接続します。
Amazon EMR クラスターのパブリック DNS 名を取得します。
[12][サービス] から [EMR] をクリックします。
[13][クラスター一覧] ページで、[labcluster] をクリックします。
[14][クラスターの詳細] ページで、[マスターパブリック DNS] がクラスターの詳細の中に表示されます。
[15][マスターパブリック DNS 名] をクリップボードにコピーします。後で使用できるように、テキストエディタに貼り付けます。
この手順は Windows ユーザーのみを対象としています。
Mac または Linux を使用している場合は、このセクションを省略して次のセクションに進んでください。
[16] ラボの起動ボタンを押した画面の左にある、[ PPK形式でダウンロード] をクリックします。
[17] 保存場所を尋ねられた場合は任意のディレクトリを指定して、ファイルを保存します。
PuTTY を使用して Amazon EC2 インスタンスに SSH で接続します。
コンピュータに PuTTY がインストールされていない場合は、ここからダウンロードしてください。
[18] PuTTY.exe を開始します。
[19] 以下のように設定して、PuTTY がタイムアウトしないように設定します。
これで、PuTTY セッションを長時間確立したままにできます。
hadoop
と入力します。これは、EC2 スタンスで使用される通常の ec 2-user ログインとは異なります。
これにより、EC2 インスタンスに接続されます。
Windows ユーザーはこちらをクリックして次のタスクに進んでください。
この手順は Mac または Linux ユーザーのみを対象としています。Windows を使用している場合は、次のタスクに進んでください。
[22] ラボの開始ボンタンを押した画面の左側にある、[ PEM形式でダウンロード] をクリックします。
[23] ディレクトリを指定してファイルを保存します。
[24] 以下のコマンドをテキストエディタにコピーします。
chmod 400 KEYPAIR.pem
ssh -i KEYPAIR.pem hadoop@MASTER-PUBLIC-DNS
以下に例を示します。
chmod 400 Downloads/qwikLABS-L123-1234.pem
ssh -i Downloads/qwikLABS-L123-1234.pem hadoop@ec2-54-111-222-333.us-west-2.compute.amazonaws.com
[26] 編集したコマンドをターミナルウィンドウに貼り付けて実行します。
[27] リモート SSH サーバーへの最初の接続を許可するかを確認するメッセージが表示されたら、yes
と入力します。
ラボのこのセクションでは、インタラクティブな Hive セッションを使用し、Amazon S3 の入力データおよび DynamoDB テーブルとして、外部テーブルリファレンスを作成します。外部テーブルは、既に Amazon S3 に保管されているデータを指します。
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
sudo chown hadoop -R /var/log/hive
mkdir /var/log/hive/user/Hadoop
hive
Hive プロンプトに [hive>] と表示されます。
これで、Amazon EMR クラスターのコマンドシェルで Hive とやり取りできるようになりました。
SET dynamodb.throughput.read.percent=1.0;
このコマンドにより、Hive は使用可能な読み込みスループットを 100% 使用するように指示されます (デフォルトは 50%)。
本稼働環境では、DynamoDB テーブルを使用する他のアプリケーションのパフォーマンスに悪影響を及ぼす可能性があります。ただし、このラボでは、他のユーザーはいないため影響はありません。
CREATE EXTERNAL TABLE orders_s3_export (
order_id string,
customer_id string,
order_date int, total double )
PARTITIONED BY (year string, month string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 's3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-3-dynamodb/data/ddb-orders';
Output:
OK
Time taken: 4.256 seconds
このコマンドでは、データの場所、注文データのフィールド、およびフォルダによるパーティションスキームが指定されています。
CREATE EXTERNAL TABLE orders_ddb_2012_01 (
order_id string,
customer_id string,
order_date bigint,
total double )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "Orders-2012-01",
"dynamodb.column.mapping" = "order_id:Order_ID,customer_id:Customer ID,order_date:Order Date,total:Total");
Output:
OK
Time taken: 1.122 seconds
テーブル定義は、DynamoDB テーブル名、DynamoDB ストレージハンドラー、注文フィールド、および EXTERNAL TABLE フィールド (スペースは含められない) と実際の DynamoDB フィールドとのマッピングを指定する必要があります。
このタスクでは、Hive と Amazon EMR を使用して、データを DynamoDB テーブルにロードします。
外部テーブルのデータにアクセスするには、ADD PARTITION コマンドを使用して、作業セットに追加するパーティションを指定する必要があります。2012 年 1 月のデータから始めます。
ALTER TABLE orders_s3_export ADD PARTITION (year='2012', month='01');
Output:
OK
Time taken: 0.62 seconds
この外部テーブルをクエリすると、このパーティションのみが結果に含まれます。
INSERT OVERWRITE TABLE orders_ddb_2012_01
SELECT order_id, customer_id, order_date, total FROM orders_s3_export;
Output:
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
… (lines removed)
Total MapReduce CPU Time Spent: 11 sec 0 msec
OK Time taken: 231.855 seconds
DynamoDB にデータをコピーするには数分かかる場合があります。待っている間、次のステップに進み、既に DynamoDB に移された一部のデータを表示します。
[35][サービス] で [DynamoDB] をクリックします。
[36] ナビゲーションペインで [テーブル] をクリックします。
[37][Orders-2012-01] テーブル、[項目] タブの順にクリックします。注文データが Amazon EMR によって Amazon DynamoDB にロードされることを確認できます。
このタスクでは、Hive と Amazon EMR を使用してデータをクエリします。まず、5 つの主要顧客の Amazon DynamoDB データをクエリします。次に、Amazon S3 の履歴データをクエリし、過去数か月に主要な顧客が費やした金額を算定します。
SELECT
customer_id,
sum(total) spend,
count(*) order_count
FROM orders_ddb_2012_01
WHERE order_date >= unix_timestamp('2012-01-01', 'yyyy-MM-dd')
AND order_date < unix_timestamp('2012-01-08', 'yyyy-MM-dd')
GROUP BY customer_id
ORDER BY spend desc LIMIT 5;
このコマンドでは [unix_timestamp] 機能が使用されています。これは、order_date がエポック以来の秒数として保存されているためです。
これには最大で 2 分かかる場合があります。
Output:
Total MapReduce jobs = 2
…(lines removed)
Total MapReduce CPU Time Spent: 32 sec 90 msec
OK
c-2cC5fF1bB 11651.26 22
c-2cC4eE3dD 11544.099999999999 18
c-3dD3dD3dD 11511.06 20
c-1bB2cC1bB 11419.94 20
c-3dD0aA4eE 11414.2 17
Time taken: 79.492 seconds, Fetched: 5 row(s)
上記の結果は、顧客 [c-2cC5fF1bB] がその週に一番多くの金額を費やしたことを示しています。
MSCK REPAIR TABLE orders_s3_export;
Output:
OK
Partitions not in metastore: orders_s3_export:year=2011/month=01 orders_s3_export:year=2011/month=02 orders_s3_export:year=2011/month=03 orders_s3_export:year=2011/month=04
… (Lines removed)
Repair: Added partition to metastore orders_s3_export:year=2011/month=08
Repair: Added partition to metastore orders_s3_export:year=2011/month=09
Repair: Added partition to metastore orders_s3_export:year=2011/month=10
Repair: Added partition to metastore orders_s3_export:year=2011/month=11
Repair: Added partition to metastore orders_s3_export:year=2011/month=12
Time taken: 4.145 seconds, Fetched: 13 row(s)
これで、Amazon S3 の履歴データをクエリして、顧客「c-2cC5fF1bB」が 2011 年の後半 6 か月に購入した内容を照会することができます。
SELECT
year,
month,
customer_id,
sum(total) spend,
count(*) order_count
FROM orders_s3_export
WHERE customer_id = 'c-2cC5fF1bB'
AND month >= 6 AND year = 2011
GROUP BY customer_id, year, month
ORDER by month desc;
パーティションフィールド ( 月と年 ) を Hive クエリで使用できることを確認します。
Output:
Total MapReduce jobs = 2
… (lines removed)
MapReduce Jobs Launched:
Job 0: Map: 7 Reduce: 1 Cumulative CPU: 33.0 sec HDFS Read: 1631 HDFS Write: 418 SUCCESS
Job 1: Map: 1 Reduce: 1 Cumulative CPU: 3.46 sec HDFS Read: 802 HDFS Write: 293 SUCCESS
Total MapReduce CPU Time Spent: 36 seconds 460 msec OK
2011 12 c-2cC5fF1bB 23893.80999999999 49
2011 11 c-2cC5fF1bB 24241.250000000004 50
2011 10 c-2cC5fF1bB 33559.009999999995 56
2011 09 c-2cC5fF1bB 27060.090000000004 53
2011 08 c-2cC5fF1bB 24490.829999999998 48
2011 07 c-2cC5fF1bB 18407.420000000002 42
2011 06 c-2cC5fF1bB 23821.030000000002 51
Time taken: 52.642 seconds, Fetched: 7 row(s)
このタスクでは、DynamoDB のデータを Amazon S3 にエクスポートします。あまり使用しない Amazon DynamoDB データを Amazon S3 にエクスポートすることで、ストレージを削減できるだけでなく、高速データに必要な低レイテンシーアクセスを維持できます。さらに、Amazon S3 にエクスポートされたデータは Amazon EMR で直接クエリできます (また、エクスポートしたテーブルは DynamoDB テーブルに結合することもできます)。
まず、Amazon S3 バケットの EXTERNAL TABLE を作成します。再度、年と月でパーティション分割されたことに注意してください。
CREATE EXTERNAL TABLE orders_s3_new_export (
order_id string,
customer_id string,
order_date int,
total double )
PARTITIONED BY (year string, month string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://dynamodb-bucket-123/output';
INSERT OVERWRITE TABLE orders_s3_new_export
PARTITION (year='2012', month='01')
SELECT * from orders_ddb_2012_01;
テーブルの月と年の適切なパーティションの値がクエリに提供されることに注意してください。
Output:
Total MapReduce jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
…(lines removed)
Table default.orders_s3_new_export stats: [num_partitions: 1, num_files: 1, num_rows: 0, total_size: 427731, raw_data_size: 0]
10000 Rows loaded to orders_s3_new_export
MapReduce Jobs Launched:
Job 0: Map: 3 Cumulative CPU: 24.0 sec HDFS Read: 914 HDFS Write: 427731 SUCCESS
Job 1: Map: 1 Cumulative CPU: 4.47 sec HDFS Read: 428297 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 28 seconds 470 msec
Time taken: 38.599 seconds
このタスクでは、Amazon S3 の出力を確認します。
[43][サービス] で [S3] をクリックします。
[44][dynamodb-bucket-xxx] バケットをクリックしてから [output] をクリックします。
[45][year=2012 > month=01] の順にクリックして、データの詳細を表示します。データファイルがラボの出力に対応していることを確認する必要があります。データファイルをダウンロードし、テキストエディタで開きます。
お疲れ様でした。 このラボを完了しました。以下の手順に従って、ラボ環境をクリーンアップします。
[46] AWS マネジメントコンソールからサインアウトするには、コンソール上部のメニューバーで [awsstudent] をクリックし、[サインアウト] をクリックします。
[47] Qwiklabs ページで [ラボを終了] をクリックします。