エキサイト株式会社の中尾です。
Embulkを使って、S3のアクセスログを集計してデータベースに挿入する方法を説明します。
例えばアクセスログから時間帯別のアクセスログの結果をDBに保存したりすることがあると思います。
前提条件は以下の通りです。
AWS Athenaのクエリ
例えば、時間帯別のアクセスログを取得するクエリです。
SELECT date_trunc('hour', from_iso8601_timestamp(time) AT TIME ZONE 'UTC') AS hour, COUNT(*) AS access_count FROM your_nginx_access_log_table WHERE time between '2023-06-25 00:00:00' and '2023-06-25 23:59.59' GROUP BY hour ORDER BY hour
実行例です。時間ごとに集計されています。 こちらをデータベースに入れます。
hour | access_count |
---|---|
2023-06-25 00:00:00 | 125 |
2023-06-25 01:00:00 | 98 |
2023-06-25 02:00:00 | 76 |
2023-06-25 03:00:00 | 104 |
2023-06-25 04:00:00 | 82 |
2023-06-25 05:00:00 | 135 |
2023-06-25 06:00:00 | 172 |
2023-06-25 07:00:00 | 204 |
2023-06-25 08:00:00 | 290 |
2023-06-25 09:00:00 | 350 |
Embulkのプラグインインストール
まずEmbulkのinputプラグインにembulk-input-athenaをいれます。 outputプラグインには例としてembulk-output-postgresqlを入れます。
embulk gem install embulk-input-athena embulk gem install embulk-output-postgresql
Embulkの設定
Embulkに使うyamlは以下の通りです。 ポイントは以下です。
- WITH句を使うとコードが見やすい
- Liquid Template Languageが使えるので、アクセスキーやデータの取得期間を動的に変えられる
in: type: athena database: default athena_url: {{ env.ATHENA_URL }} s3_staging_dir: {{ env.S3_STAGING_DIR }} access_key: {{ env.ACCESS_KEY }} secret_key: {{ env.SECRET_KEY }} query: | WITH access_log_hour_count AS ( -- 時間帯別アクセス数 SELECT date_trunc('hour', from_iso8601_timestamp(time) AT TIME ZONE 'UTC') AS hour, COUNT(*) AS access_count FROM your_nginx_access_log_table WHERE time between {{ env.START_DATE }} and {{ env.END_DATE }} GROUP BY hour ORDER BY hour ) SELECT hour, access_count FROM access_log_hour_count columns: - {name: hour, type: timestamp, format: '%Y-%m-%d %H:%i:%s'} - {name: count, type: long} null_to_zero: true #out: # type: stdout out: type: postgresql host: {{ env.DB_HOST }} database: {{ env.DB_DATABASE }} user: {{ env.DB_USER }} password: {{ env.DB_PASSWORD }} table: access_log_hour_count mode: truncate_insert default_timezone: 'Asia/Tokyo' column_options: hour: {value_type: string, timestamp_format: '%Y-%m-%d %H:%i:%s'}
実行する例
### 環境変数は適宜入れてください export START_DATE= export END_DATE= export DB_USER= export DB_DATABASE= export DB_PASSWORD= export DB_HOST= export ATHENA_URL= export S3_STAGING_DIR= export ACCESS_KEY= export SECRET_KEY= ### 環境変数を読み込み実行します。 embulk run access_log_hour_count.yml.liquid
最後に
これでアクセスログがデータベースに入りました。 自前でバッチを作るよりも見やすく、そして管理しやすいですね。 よかったら参考になればと思います。