ラボ 4 - Amazon EMR での Hive を使ったサーバーログの処理

このラボでは、Apache Hive および Amazon EMR を使用して、オンライン広告をサポートするサーバーから Amazon S3 にアップロードされたログを処理します。ログが処理された後、その結果を示すデータは、クエリ可能な集合として、Amazon S3 に保持されているリレーショナルデータベースのテーブルに保存されます。

目標

このラボを完了すると、以下の操作を実行できるようになります。

所要時間

このラボは、修了までに60 分かかります。

注意


ラボのシナリオ

あるインターネット広告会社が、Hive クラスター (Hive および Amazon EMR を使用する Hadoop クラスター) によって生成されたデータが入力されているデータウェアハウスを運用しています。Amazon EC2 インスタンスでは、広告インプレッション、および広告サイトへのリダイレクトのクリックを提供します。各 Amazon EC2 インスタンスでは、インプレッションとクリックストリームデータを Amazon S3 にプッシュされたログファイルに保存します。

Amazon EC2 インスタンスでは、2 種類のログファイルを作成します。インプレッションログとクリックログです。広告がユーザーに表示されるたびに、エントリがインプレッションログに追加されます。ユーザーが広告をクリックするたびに、エントリがクリックストリーミングログに追加されます。

この環境で、Amazon EC2 インスタンスは、最新のログのセットを 5 分おきに Amazon S3 にプッシュします。これによりタイムリーなログ分析が可能になります。以下は Amazon S3 に保存されたインプレッションログの例です。

/tables/impressions/dt=2009-04-13-08-05/ec2-47-98-45-77.amazon.com-2009-04-13-08-05.log

この例では、ログデータは [/awsu-ilt/AWS-200-BIG/v3.0/data/lab-4-hive/data/tables/impressions/] というフォルダにある [/us-west-2-aws-training/] という名前の Amazon S3 バケットに保存されています。[/impressions/] フォルダにはパーティション分割されたデータがあります (パーティション分割されたデータファイルを使用すると、クエリのときに、関係のあるデータのみを Amazon S3 からダウンロードすることが可能になるため、クエリのパフォーマンスが向上します)。パーティション分割されたテーブルの命名構文は [Partition column]=[Partition value] です。例えば、dt=2009-04-13-05 の場合、dt (日付/時間) は Partition column (パーティション列) の名称で、2009-04-13-05 は Partition value (パーティション値) です。


AWS マネジメントコンソールにアクセスする

このラボの間は、リージョンを変更しないでください。


タスク 1: Amazon EMR クラスターの起動

このラボでは、マスターノードにインストールされた Hive を使用して Amazon EMR クラスターを作成します。Hive を使用し、HiveQL、SQL のようなクエリ言語、を用いてデータをロードおよびクエリします。Hive では、クエリを分散 MapReduce アプリケーションに変換します。これらのアプリケーションは、マスターノードによってコアノードに分散されます。これは分散処理能力を利用して、データを保存およびクエリするのに便利です。その際、MapReduce アプリケーションを Java などのプログラミング言語で作成する必要はありません。Amazon EMR を使用して DynamoDB とデータをやり取りする場合にも、Hive をインストールする必要があります。

クラスターを作成する前に、Amazon S3 バケットを作成して出力データとログを保存します。入力データ (ログ) を含む Amazon S3 バケットは、既にお客様用に作成され、AWS トレーニング部門にホストされています。出力バケットを作成した後、Amazon EMR クラスターをインストール済みの Hive で起動します。

一時ファイルを使用することなく、Amazon S3 に対してデータの読み書きができるのは、Hive と MapReduce の AWS バージョンに備わっている機能です。これによりパフォーマンスは大幅に向上しますが、HDFS と Amazon S3 はこの環境では動作が異なります。Amazon S3 は、Amazon EMR 向けの入出力リポジトリ (MapReduce 処理フェーズに必要な HDFS) として使用するか、あるいは HDFS の置き換えとして使用するかのいずれか 1 つの方法で Amazon EMR とともに使用できます。Amazon S3 でデータを維持することにより、データの再利用が可能です。

タスク 1.1: 出力バケットの作成

このタスクでは、出力データおよびログ用の出力バケットを作成します。

Amazon S3 の各バケットには一意の名前が必要になります。そこで、ランダムな数値をバケット名に追加します。

  1. [1][AWS マネジメントコンソール] の [サービス] で [S3] をクリックします。

  2. [2] [バケットを作成する] をクリックします。

  3. [3][バケット名] には、次のように入力します。hive-bucket-123 (123 の部分は乱数で置き換え)

  4. [4][作成] をクリックします。

タスク 1.2: Amazon EMR クラスターの起動

このタスクでは、Hive をインストールした Amazon EMR クラスターを起動します。

  1. [5][サービス] から [EMR] をクリックします。

  2. [6][クラスターを作成] をクリックします。

  3. [7] 画面の上部で、[クラスターの作成 – クイックオプション] の横にある [詳細オプションに移動する] をクリックします。

  4. [8][ステップ 1: ソフトウェアおよびステップ] ページの [ソフトウェア設定] セクションで次のように指定します。

画面の下部で [最後のステップが完了したらクラスターを自動的に終了します] チェックボックスがオフになっていることを確認し、長時間稼働するクラスターを作成します。

  1. [9][次へ] をクリックします。

  2. [10][ステップ 2: ハードウェア] ページの [ハードウェア構成] セクションで次のように指定します。

クラスターには、1 つのマスターインスタンスと 2 つのコアインスタンスが付与されます。

  1. [11][次へ] をクリックします。

  2. [12][ステップ 3: クラスター全般設定] ページの [全般オプション] セクションの設定は以下のとおりです。

  1. [13][次へ] をクリックします。

  2. [14][ステップ 4: セキュリティ] ページで、[EC2 キーペア] に、[qwikLABS] で生成されたキーペアを選択します。

  3. [15][アクセス権限] で [カスタム] をクリックします。

  4. [16][クラスターを作成] をクリックします。

これでクラスターが作成されます。クラスターを待っている間、セキュリティグループ設定を更新して、マスターノードへの SSH アクセスを許可することができます。

  1. [17][セキュリティとアクセス] の下、[マスターのセキュリティグループ] の横にある、マスターのセキュリティグループのリンク (ElasticMapReduce-master) をクリックします。リンクが表示されていない場合は、画面右上のリロードボタンをクリックして画面をリフレッシュします。

新しいタブが開き、セキュリティグループの設定ページが表示されます。

  1. [18] [ElasticMapReduce-master] を選択します。

  2. [19] 下部のペインで、[インバウンド] タブをクリックします。

  3. [20][編集] をクリックします。

  4. [21] 下へスクロールし、[ルールの追加] をクリックし、以下のように設定します。

これで EMR コンソールに戻ることができます。

  1. [22] 現在のウェブブラウザタブを閉じ、Amazon EMR ウェブブラウザタブに戻ります。

  2. [23] EMR クラスターのステータスが [待機中] に変わるまで待ちます。

ステータスが [開始中] から [実行中]、[待機中] へと変化します。

これには最大で10 分かかります。更新 を数分ごとにクリックして、ステータスを更新します。


タスク 2: SSH を使用して Hadoop マスターノードに接続する

SSH クライアントと Amazon EC2 キーペアのプライベートキーを使用して、Amazon EMR マスターノードに接続します。

EMR クラスターのパブリック DNS 名を取得する

EMR クラスターのパブリック DNS 名を取得します。

  1. [24][サービス] から [EMR] をクリックします。

  2. [25][クラスター一覧] ページで [Hive EMR cluster] をクリックします。

  3. [26][クラスター] ページに、[マスターパブリック DNS] が表示されます。

  4. [27][マスターパブリック DNS 名] をクリップボードにコピーします。後で使用できるように、テキストエディタに貼り付けます。

Windows ユーザー: SSH を使って接続する

この手順は Windows ユーザーのみを対象としています。

Mac または Linux を使用している場合は、このセクションを省略して次のセクションに進んでください

  1. [28] ラボの開始ボンタンを押した画面の左側にある、[ PPK形式でダウンロード] をクリックします。

  2. [29] 保存場所を尋ねられた場合は任意のディレクトリを指定して、ファイルを保存します。

PuTTY を使用して Amazon EC2 インスタンスに SSH で接続します。

コンピュータに PuTTY がインストールされていない場合は、ここからダウンロードしてください。

  1. [30] PuTTY.exe を開きます。

以下のようにして、PuTTY がタイムアウトしないように設定します。

これで、PuTTY セッションを長時間確立したままにできます。

  1. [32] PuTTY セッションの設定を行います。
  1. [33][ユーザー名] の入力を求められたら、hadoopと入力します。

これは、EC2 スタンスで使用される通常の ec 2-user ログインとは異なります。

これにより、EC2 インスタンスに接続されます。

Windows ユーザーはこちらをクリックして次のタスクに進んでください。

Mac および Linux ユーザー

この手順は Mac または Linux ユーザーのみを対象としています。Windows を使用している場合は、次のタスクに進んでください。

  1. [34] ラボの開始ボンタンを押した画面の左側にある、[ PEM形式でダウンロード] をクリックします。

  2. [35] ディレクトリを指定してファイルを保存します。

  3. [36] 以下のコマンドをテキストエディタにコピーします。

chmod 400 KEYPAIR.pem ssh -i KEYPAIR.pem hadoop@MASTER-PUBLIC-DNS
  1. [37] 編集したコマンドをターミナルウィンドウに貼り付けて実行します。

  2. [38] リモート SSH サーバーへの最初の接続を許可するかどうかを確認するメッセージが表示されたら、yesと入力します。

認証にキーペアを使用しているため、パスワードの入力は要求されません。


タスク 3: Hive をインタラクティブに実行する

このタスクでは、シンプルなデータウェアハウスの Hive テーブルを作成します。Hive テーブルを作成する前に、マスターノードでインタラクティブな Hive セッションを開始し、JSON シリアライザー/デシリアライザー (SerDe) を参照してログコンテンツを読み込みます。

  1. [39] 次のようなメッセージは、マスターノードへの接続が成功したことを示します。
__| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
  1. [40] 以下のコマンドを SSH ウィンドウに貼り付け、Hive で使用されるログディレクトリを作成します。
sudo chown hadoop -R /var/log/hive mkdir /var/log/hive/user/hadoop
  1. [41] このコマンドをテキストエディタへコピーします

    注意:使用する前にバケット名を編集してから実行します

hive -d SAMPLE=s3://aws-tc-largeobjects/AWS-200-BIG/v3.1/lab-4-hive/data -d DAY=2009-04-13 -d HOUR=08 -d NEXT_DAY=2009-04-13 -d NEXT_HOUR=09 -d OUTPUT=s3://hive-bucket-123/output/

このコマンドは、入出力のロケーションや日時などの、クエリに使用する変数を hive へ渡します。

  1. [42][hive-bucket-123] を、このラボの最初に使用したバケット名で置き換えます。

  2. [43] コマンドを SSH ウィンドウに貼り付けます。

[hive>] プロンプトが表示されます。

Hive を使用してテーブルを作成する

これから、Amazon S3 のログファイルから Hive テーブル (インプレッションおよびクリック) を作成します。これらのテーブルはジョブの入力データです。

  1. [44] この Hive ステートメントを実行し、Amazon S3 に保存されているログから外部インプレッションテーブルを作成します。
CREATE EXTERNAL TABLE impressions ( requestBeginTime string, adId string, impressionId string, referrer string, userAgent string, userCookie string, ip string) PARTITIONED BY (dt string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' with serdeproperties ('paths'='requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip') LOCATION '${SAMPLE}/tables/impressions';

Output:

OK Time taken: 0.321 seconds

このテーブルのデータは Amazon S3 に存在します。[CREATE EXTERNAL TABLE] ステートメントでは、HDFS にデータを保存しません (CREATE TABLE ステートメントが保存します)。このステートメントは、データがある場所とテーブル列が構成される方法について Hive に伝えます。テーブルがクエリされた場合、Hive は Hadoop を使用してテーブルを読み込みます。また、外部テーブルが削除される場合、そのデータは Amazon S3 から削除されません。

パーティションは、クエリのパフォーマンスやデータ管理のしやすさを向上させるために、論理的に整理してデータを保存する 1 つの方法です。通常、パーティションを用いてデータを分割すると、特定のデータサブセットへのアクセスをより迅速に行えます。例えば、これらのログに対するクエリはほとんどの場合、タイムスタンプに基づいています。つまりこのデータは時間に基づいて分割されます。ただし、Hive はどのパーティションがテーブルに存在するかをまだ認識していません。

  1. [45] このコマンドを実行し、入力データを検査してすべてのパーティションを検出します。
set hive.msck.path.validation=ignore; MSCK REPAIR TABLE impressions;

Output:

OK Partitions not in metastore: impressions:dt=2009-04-12-13-00 impressions:dt=2009-04-12-13-05 ...(LINES REMOVED) Repair: Added partition to metastore impressions:dt=2009-04-14-13-00 Time taken: 8.942 seconds, Fetched: 242 row(s)

MSCK REPAIR TABLE は、存在しているものの現時点でリストされていない (Amazon S3 のパーティションを含む) 任意のパーティションをメタデータストアに情報として追加します。このステートメントは Hive の Amazon バージョンの拡張です。

  1. [46] 以下のコマンドを実行し、Amazon S3 に保存されているクリックストリームログから外部クリックテーブルを作成します。
CREATE EXTERNAL TABLE clicks (impressionId string) PARTITIONED BY (dt string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' WITH SERDEPROPERTIES ('paths'='impressionId') LOCATION '${SAMPLE}/tables/clicks';
  1. [47] 以下のコマンドを実行して、クリックストリームログデータから、すべてのパーティションを返します。
MSCK REPAIR TABLE clicks;
  1. [48] 以下のステートメントを入力して、テーブルの作成を確認します。
show tables;

出力では、Amazon S3 に保存された、クリックとインプレッションの両方の外部テーブルが返されます。

Output:

OK clicks impressions Time taken: 0.088 seconds, Fetched: 2 row(s)

タスク 4: Hive を使用してテーブルを結合する

ラボのこのセクションでは、インプレッションテーブルをクリックテーブルと結合します。ユーザーに提示される広告情報 (インプレッションデータ) と、ユーザーが広告をクリックすることによりもたらされた広告結果の情報 (クリックストリームデータ) を組み合わせることにより、ユーザーの行動を深く理解し、広告サービスのターゲット設定と収益化が可能になります。

クリックテーブルとインプレッションテーブルは両方ともパーティション分割されたテーブルです。2 つのテーブルを結合するとき、[CREATE TABLE] コマンドで、新しいテーブルをパーティション分割するように指示します。

タスク 4.1: テーブルの結合

ラボのこのセクションでは、インプレッションテーブルをクリックテーブルと結合します。

  1. [49] 以下のコマンドを実行して、外部結合テーブルを作成します。
CREATE EXTERNAL TABLE joined_impressions ( requestBeginTime string, adId string, impressionId string, referrer string, userAgent string, userCookie string, ip string, clicked Boolean) PARTITIONED BY (day string, hour string) STORED AS SEQUENCEFILE LOCATION '${OUTPUT}/tables/joined_impressions';

STORED AS SEQUENCEFILE 句は、指定されたロケーションでデータを圧縮するという意味です。この標準的な Hadoop 形式で圧縮を実行することにより、JSON 形式より優れたパフォーマンスが提供され、SerDe で JSON データを読み込む必要はなくなります。

  1. [50] 以下のコマンドを実行し、ジョブフローのローカル HDFS パーティションに一時テーブル (tmp_impressions) を作成し、一定期間、中間データを保存します。
CREATE TABLE tmp_impressions ( requestBeginTime string, adId string, impressionId string, referrer string, userAgent string, userCookie string, ip string) STORED AS SEQUENCEFILE;
  1. [51] 以下のコマンドを実行して、参照されている期間のインプレッションログデータを挿入します。
INSERT OVERWRITE TABLE tmp_impressions SELECT from_unixtime(cast((cast(i.requestBeginTime as bigint) / 1000) as int)) requestBeginTime, i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, i.ip FROM impressions i WHERE i.dt >= '${DAY}-${HOUR}-00' AND i.dt < '${NEXT_DAY}-${NEXT_HOUR}-00';

インプレッションテーブルがパーティション分割されるため、一定期間に関係するパーティションのみが読み込まれます。

期間の開始は [DAY-HOUR] で、期間の終了は [NEXT_DAY-NEXT_HOUR] です。[NEXT_DAY] は、次の期間の日付です。1 日の終わりに処理を実行している場合のみ、[${DAY}] とは異なります。この場合、期間はその次の日に終了します。

[tmp_impressions] テーブルにデータが既に含まれている場合、[INSERT OVERWRITE TABLE] ではテーブルのすべてのデータの削除、置き換えを実行します。この場合、ログデータがクエリされ、HDFS に書き込まれるまで待機する必要があります。

Output:

Query ID = hadoop_20180207071847_ab90b14a-d3d6-4070-af2b-33c6937c889d Total jobs = 1 Launching Job 1 out of 1 Tez session was closed.Reopening... Session re-established. Status: Running (Executing on YARN cluster with App id application_1517985610951_0002) Loading data to table default.tmp_impressions OK Time taken: 32.499 seconds
  1. [52] 以下のステートメントを実行して、(tmp_clicks の名前の) 一時クリックテーブルを HDFS に作成します。
CREATE TABLE tmp_clicks ( impressionId string ) STORED AS SEQUENCEFILE;
  1. [53] 以下のステートメントを実行し、データを tmp_clicks テーブルに挿入します。
INSERT OVERWRITE TABLE tmp_clicks SELECT impressionId FROM clicks c WHERE c.dt >= '${DAY}-${HOUR}-00' AND c.dt < '${NEXT_DAY}-${NEXT_HOUR}-20';

クリックテーブルでは、期間は 20 分間まで延長されます。これにより、インプレッション後、最大 20 分間のクリックの実行、およびクリックの結果がなかったどのインプレッションの保持も可能になります。

Output:

Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1517985610951_0002) Loading data to table default.tmp_clicks OK Time taken: 16.711 seconds

出力ストリームの終了時には、[tmp_clicks] テーブルにロードされる行の数値、および HDFS の読み取りと書き込みの数値に注意します。

  1. [54][tmp_clicks] および [tmp_impressions] の LEFT OUTER JOIN を作成して、S3 出力バケットの [joined_impressions] テーブルに結果のデータセットを書き込むには、以下をコピーします。
INSERT OVERWRITE TABLE joined_impressions PARTITION (day='${DAY}', hour='${HOUR}') SELECT i.requestBeginTime, i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, i.ip, (c.impressionId is not null) clicked FROM tmp_impressions i LEFT OUTER JOIN tmp_clicks c ON i.impressionId=c.impressionId;

LEFT OUTER JOIN では、[impressions] からすべてのレコードが返されます。これは結合条件が一致するレコードが [clicks] テーブルにない場合でも実行されます。一致するレコードが [clicks] 内に見つからない場合、NULL 値が各列に返されます。

LEFT OUTER JOIN では、[impressions] テーブル (クリック結果が得られたか得られなかったかのインプレッションレコード) にすべてのデータを保持します。LEFT OUTER JOIN では、選択された期間内のインプレッションから生じていない [clicks] のデータは除外されます。[joined_impressions] テーブルは Amazon S3 に保存されるため、出力は他のクラスターで利用可能です。

Output:

Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1517985610951_0002) Loading data to table default.joined_impressions partition (day=2009-04-13, hour=08) OK Time taken: 17.382 seconds
  1. [55] 以下のステートメントを実行し、[joined_impressions] をクエリします。(** 注 ** 10-limit パラメータは、結果セットを 10 個のレコードまでに制限します)。
SELECT * FROM joined_impressions LIMIT 10;

Output:

OK 2009-04-13 08:04:32 UJQHtEBXjvN5fte0ehWtSt0QaeKFKp wFcgn02kwi7MumbnjflD1LfMT4tgbG cartoonnetwork.com Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; FunWebProducts; GTB6; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET ...(lines removed) r2k96t1CNjSU9fJKNdiRdfn9MdWNFn 71.124.66.3 false 2009-04-13 08 Time taken: 0.065 seconds, Fetched: 10 row(s)
  1. [56] ブラウザの [AWS マネジメントコンソール] に戻ります。

  2. [57][サービス] で [S3] をクリックします。

  3. [58][hive-bucket-xxx] バケットをクリックします。

  4. [59][output > tables > joined_impressions > day=2009-04-13 > hour=08] をクリックします。

結果テーブルのデータが、S3 オブジェクトとして保存されることを確認します。

このファイルは、Hadoop によって解釈されることができるシーケンス形式であり、通常のテキストファイルではありません。


ラボを終了する

お疲れ様でした。 このラボを完了しました。以下の手順に従って、ラボ環境をクリーンアップします。

  1. [60] AWS マネジメントコンソールからサインアウトするには、コンソール上部のメニューバーで [awsstudent] をクリックし、[サインアウト] をクリックします。

  2. [61] Qwiklabs ページで [ラボを終了] をクリックします。