AWS DMSを使ってRDS変更差分をBIに毎時で反映させた話

こんにちは。エキサイトでエンジニアをしている吉川です。

エキサイトホールディングス Advent Calendar 4日目の記事になります。

AWS DMSとは

公式ドキュメント によると

AWS Database Migration Service (AWS DMS) は、リレーショナルデータベース、データウェアハウス、NoSQL データベース、他の種類のデータストアを移行しやすくするクラウドサービスです。

とあります。今回はRDSからS3にデータを流すために使用しましたことを書いていきます。S3に流したあとはGlue CrawlerやAthenaを使って前処理をした後、BIツール(QuickSight)を使ってデータの可視化を行いました。

以前はRDSスナップショットのS3エクスポート機能を使ってにデータを流すようにしていましたが、DMSを使うようにしたことでQuickSight反映までが高速化し、毎時で更新することが可能になりました。

何が嬉しいか

  • 変更差分ファイルだけが高速で作られる
    • DMSを設定しておくと、RDSに変更があった場合数秒で変更差分ファイルが作られます。RDSスナップショットの場合はエクスポートの度にテーブルの中を全て出力するので、時間がかかってしまいます。
    • テーブルの1レコードが変更される度にファイルが作られるので、前処理に一手間はかかるのですが(詳しくは後述)、元々前処理は行う前提だったので吸収できるものと考えました。
  • 特定データベース、テーブルに絞って出力できる
    • RDSスナップショットのS3エクスポート機能では特定データベース、テーブルに絞ってエクスポートが可能ですが、DMSにも同様の機能があります。スナップショット自体は絞り込みができないので、DMSの方がより最小構成に絞って管理が楽になると感じています。

実際にやってみる

ソースとターゲットのエンドポイントを作成する

  1. 「DMS > エンドポイント > エンドポイントの作成」からソースエンドポイントとターゲットエンドポイントをそれぞれ作成

    • 今回はソースエンジンにAmazon Aurora MySQL、ターゲットエンジンにAmazon S3を設定します。
    • エンドポイント設定 を行うことで、parquet形式でエクスポートできます。(詳しくは後述)
    • 各エンドポイントの接続テストも可能です。
  2. 「DMS > レプリケーションインスタンス」からレプリケーションインスタンスを作成

    • インスタンスクラスによって料金が異なります。セオリー通り最小インスタンスで試してから、必要に応じて上げていきましょう。
    • マルチAZを設定することもできます。
    • エンジンバージョンは現在最新の3.4.7以降ではVPC設定かパブリックルーティング設定が必要になります。(実質VPC設定が必須だと思います)
  3. 「DMS > データベース移行タスク」からデータ移行タスクを作成

    • 上記で作成したエンドポイント、レプリケーションインスタンスを選択します。
    • 今回は移行タイプを「既存のデータを移行し、継続的な変更をレプリケートする」に設定しましたが、既存データの移行だけ、または変更のレプリケートだけに設定することも可能です。
    • テーブルマッピングの設定を行うと、設定したデータベースやテーブルに絞って出力することが可能です。
  4. 以上が設定できたら、データベース移行タスクのアクションから実行すると、既存データの移行->変更がある度に差分ファイルの出力、という動きができます

S3に出力されるファイルとその読み込み

S3に出力されるファイル名には2パターンあり、

  • 既存のデータの場合:「LOAD00000001.csv」と固定のファイル名
  • 変更差分の場合:「20221204-093050123.csv」とタイムスタンプになっているファイル名

となります。またファイルには既存テーブルのカラムに加えて「op」というカラムが追加されており、「I(追加)」、「U(更新)」、「D(削除)」のいずれかが入ります。

今回はS3のターゲットエンドポイント設定に以下を追加しています。

{
    "CompressionType": "GZIP", # GZIP形式で圧縮
    "DataFormat": "parquet", # csvではなくparquet形式にする
    "EnableStatistics": false, # 統計情報を含めない
    "IncludeOpForFullLoad": true, # 既存データもopカラムを含める
    "TimestampColumnName": "time_stamp", # 指定したカラム名でタイムスタンプを含める
    "DatePartitionEnabled": false # 日付によるS3パーティション分割は行わない
}

こうして作成されたファイルをGlue Crawlerで読み込み、以下のようなSQLをAthenaで実行することで、各レコードの最新の状態をGlueテーブルに持てるようにしました。

# Glue Crawlerで作成されたテーブルを`table_from_glue_crawler`とする
# 元々のRDS上のテーブルには`column_a`と`column_b`があり、`column_a`を主キーとする

CREATE TABLE IF NOT EXISTS table_a
WITH (format = 'Parquet')
AS (

    # 主キーが一致してtime_stampが最大のレコードを抽出
    # さらにその中でopが削除ではないものに絞る
    
    SELECT
        MAX_BY(column_a, time_stamp) AS column_a,
        MAX_BY(column_b, time_stamp) AS column_b
    FROM
        table_from_glue_crawler
    GROUP BY
        column_a
    HAVING
        MAX_BY(op, time_stamp) <> 'D'
)

これをQuickSightのデータセットに用いることで、RDSのデータを可視化できるようになりました。最後にGlue Crawlerを毎時で実行するように設定することで、RDSの更新が毎時で反映されるようにしました。

おわりに

いかがだったでしょうか。RDSのデータを他に流す際に、DMSを使うと早いスパンで反映ができるのは便利だなと思っています。前処理もシンプルにでき、料金もそこまで高価ではないので気軽に使っていけるのも良いですね。

最後になりますが、弊社ではエンジニア、デザイナーの募集を随時行っています。もしご興味があれば以下からご連絡ください!!

https://www.wantedly.com/companies/excite/projects

明日以降のアドベントカレンダー記事もお楽しみに!