Embulkを使って、アクセスログを集計してデータベースに挿入する

エキサイト株式会社の中尾です。

Embulkを使って、S3のアクセスログを集計してデータベースに挿入する方法を説明します。

例えばアクセスログから時間帯別のアクセスログの結果をDBに保存したりすることがあると思います。

前提条件は以下の通りです。

  • AWS Athenaを使ってクエリが実行できる
  • nginxのアクセスログがS3に保存されている

AWS Athenaのクエリ

例えば、時間帯別のアクセスログを取得するクエリです。

SELECT
  date_trunc('hour', from_iso8601_timestamp(time) AT TIME ZONE 'UTC') AS hour,
  COUNT(*) AS access_count
FROM
  your_nginx_access_log_table
WHERE
  time between '2023-06-25 00:00:00' and '2023-06-25 23:59.59'
GROUP BY
  hour
ORDER BY
  hour

実行例です。時間ごとに集計されています。 こちらをデータベースに入れます。

hour access_count
2023-06-25 00:00:00 125
2023-06-25 01:00:00 98
2023-06-25 02:00:00 76
2023-06-25 03:00:00 104
2023-06-25 04:00:00 82
2023-06-25 05:00:00 135
2023-06-25 06:00:00 172
2023-06-25 07:00:00 204
2023-06-25 08:00:00 290
2023-06-25 09:00:00 350

Embulkのプラグインインストール

まずEmbulkのinputプラグインにembulk-input-athenaをいれます。 outputプラグインには例としてembulk-output-postgresqlを入れます。

embulk gem install embulk-input-athena
embulk gem install embulk-output-postgresql

github.com

Embulkの設定

Embulkに使うyamlは以下の通りです。 ポイントは以下です。

  • WITH句を使うとコードが見やすい
  • Liquid Template Languageが使えるので、アクセスキーやデータの取得期間を動的に変えられる
in:
  type: athena
  database: default
  athena_url: {{ env.ATHENA_URL }}
  s3_staging_dir: {{ env.S3_STAGING_DIR }}
  access_key: {{ env.ACCESS_KEY }}
  secret_key: {{ env.SECRET_KEY }}
  query: |
    WITH access_log_hour_count AS (
      -- 時間帯別アクセス数
      SELECT
        date_trunc('hour', from_iso8601_timestamp(time) AT TIME ZONE 'UTC') AS hour,
        COUNT(*) AS access_count
      FROM
        your_nginx_access_log_table
      WHERE
        time between {{ env.START_DATE }} and {{ env.END_DATE }}
      GROUP BY
        hour
      ORDER BY
        hour
    )

    SELECT
        hour,
        access_count
    FROM
        access_log_hour_count

  columns:
    - {name: hour, type: timestamp, format: '%Y-%m-%d %H:%i:%s'}
    - {name: count, type: long}
  null_to_zero: true
#out:
#  type: stdout
out:
  type: postgresql
  host: {{ env.DB_HOST }}
  database: {{ env.DB_DATABASE }}
  user: {{ env.DB_USER }}
  password: {{ env.DB_PASSWORD }}
  table: access_log_hour_count
  mode: truncate_insert
  default_timezone: 'Asia/Tokyo'
  column_options:
    hour: {value_type: string, timestamp_format: '%Y-%m-%d %H:%i:%s'}

実行する例

### 環境変数は適宜入れてください
export START_DATE=
export END_DATE=
export DB_USER=
export DB_DATABASE=
export DB_PASSWORD=
export DB_HOST=
export ATHENA_URL=
export S3_STAGING_DIR=
export ACCESS_KEY=
export SECRET_KEY=

### 環境変数を読み込み実行します。
embulk run access_log_hour_count.yml.liquid

最後に

これでアクセスログがデータベースに入りました。 自前でバッチを作るよりも見やすく、そして管理しやすいですね。 よかったら参考になればと思います。