JavaでのAmazon Timestreamの使い方

こんにちは。 エキサイト株式会社の三浦です。

AWSには、「Amazon Timestream」というサービスがあります。

時系列データを扱うのに非常に役立つサービスであり、Javaからも簡単に使えるようSDKが用意されています。

今回は、この「Amazon Timestream」をJavaでどうやれば使えるのか、説明していきます。

Amazon Timestreamとは

Amazon TimestreamAWSが提供しているサービスで、時系列的に取り扱いたいデータを格納し、そのデータをもとに分析を行うためのものです。

aws.amazon.com

高速かつスケーラブルなサーバーレス時系列データベース

Amazon Timestream は、高速かつスケーラブルなサーバーレス時系列データベースサービスです。1 日あたり数兆件規模のイベントを最大 1,000 倍の速度でより簡単に保存および分析できます。Amazon Timestream は、容量とパフォーマンスを調整するために自動的にスケールアップまたはスケールダウンするので、基盤インフラストラクチャの管理が不要です。

マネージドサービスであるため開発者側は管理の手間を掛けることなく、ほとんど「使うこと」だけに注力することが出来ます。

使い方としては、例えば自社で取り扱っているIoT機器について、定期的にCPU使用率を計測したい、という要件があった場合に、「機器のID」と「計測時間」をキー、「CPU使用率」をバリューとしてTimestreamに定期的に投げる、というようにすることで、CPU使用率の時系列データを保存し、後から分析できるようになる、というようなものです。

もちろんIoT機器に限らず様々な場面で使用できます。

大量のアクセスを受け付け、大量の書き込みを行う必要がある、というような処理に使うのが良いでしょう。

さて、このTimestreamですが、使用するに当たって様々な言語のSDKを用意してくれています。

今回は、Javaで使うに当たっての使い方を紹介します。

JavaでTimestreamを使う方法

AWS

まずはAWSのウェブコンソールなどで、TimestreamのDB・テーブルを作っておきます。

ここでは仮に以下のように作ります。

  • DB: test_db
  • Table: test_table

Timestreamの設定

また、IAMロールに適切なポリシーも付けましょう。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowTask",
            "Effect": "Allow",
            "Action": [
                "timestream:WriteRecords", // データの書き込みに必要
                "timestream:Select", // データ読み込みに必要
                "timestream:DescribeEndpoints", // 上記2つを使う際には必要
            ],
            "Resource": "*"
        }
    ]
}

AWS側の準備はこれで完了です。

Java

書き込み

まずは、書き込みをやっていきます。

Gradleを使っている場合は、以下のライブラリを読み込んでください。

implementation platform('software.amazon.awssdk:bom:2.17.293') // バージョンは適宜変更

implementation('software.amazon.awssdk:timestreamwrite')
implementation('software.amazon.awssdk:apache-client')

続いて、書き込み用クライアントを設定します。

package sample.config;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;

import java.time.Duration;

@Configuration
@RequiredArgsConstructor
public class TimestreamConfig {
    /**
     * Timestream write clientを作成してBean化
     *
     * @return Timestream write client
     */
    @Bean
    public TimestreamWriteClient buildWriteClient() {
        ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
        httpClientBuilder.maxConnections(5000);

        RetryPolicy.Builder retryPolicy = RetryPolicy.builder();
        retryPolicy.numRetries(10);

        ClientOverrideConfiguration.Builder overrideConfig = ClientOverrideConfiguration.builder();
        overrideConfig.apiCallAttemptTimeout(Duration.ofSeconds(20));
        overrideConfig.retryPolicy(retryPolicy.build());

        return TimestreamWriteClient.builder()
                .httpClientBuilder(httpClientBuilder)
                .overrideConfiguration(overrideConfig.build())
                .region(Region.AP_NORTHEAST_1)
                .build();
    }
}

最後に、実際に書き込みます。

package sample.write;

import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;
import software.amazon.awssdk.services.timestreamwrite.model.Dimension;
import software.amazon.awssdk.services.timestreamwrite.model.MeasureValueType;
import software.amazon.awssdk.services.timestreamwrite.model.Record;
import software.amazon.awssdk.services.timestreamwrite.model.TimeUnit;
import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest;

import java.time.ZonedDateTime;
import java.util.List;

@Repository
@RequiredArgsConstructor
public class SampleWrite {

    private static final Record COMMON_ATTRIBUTES = Record.builder()
            .measureValueType(MeasureValueType.VARCHAR)
            .measureName("sampleMeasureName")
            .measureValue("sampleMeasureValue")
            .version(1L)
            .build();

    private final TimestreamWriteClient timestreamWriteClient;

    public void writeSampleData() {
        final List<Dimension> dimensionList = List.of(
                Dimension.builder().name("sampleDimensionName").value("sampleDimensionValue").build()
        );

        final long currentTimestamp = ZonedDateTime.now().toEpochSecond();

        final List<Record> recordList = List.of(
                Record.builder()
                        .dimensions(dimensionList)
                        .time(String.valueOf(currentTimestamp))
                        .timeUnit(TimeUnit.SECONDS)
                        .build()
        );

        WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder()
                .databaseName("test_db")
                .tableName("test_table")
                .records(recordList)
                .commonAttributes(COMMON_ATTRIBUTES)
                .build();

        timestreamWriteClient.writeRecords(writeRecordsRequest);
    }
}

これで書き込むことが出来ました!

Timestreamに書き込んだデータ

書き込み

続いて、今書き込んだデータを読み込んでみます。

Gradleを使っている場合は、以下のライブラリを読み込んでください。

implementation platform('software.amazon.awssdk:bom:2.17.293') // バージョンは適宜変更

implementation('software.amazon.awssdk:timestreamquery')

続いて、読み込み用クライアントを設定します。

package sample.config;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;

@Configuration
@RequiredArgsConstructor
public class TimestreamConfig {
    /**
     * Timestream query clientを作成してBean化
     *
     * @return Timestream query client
     */
    @Bean
    public TimestreamQueryClient buildQueryClient() {
        return TimestreamQueryClient.builder()
                .region(Region.AP_NORTHEAST_1)
                .build();
    }
}

最後に、実際に読み込みます。

package sample.read;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamquery.model.ColumnInfo;
import software.amazon.awssdk.services.timestreamquery.model.Datum;
import software.amazon.awssdk.services.timestreamquery.model.QueryRequest;
import software.amazon.awssdk.services.timestreamquery.model.QueryResponse;
import software.amazon.awssdk.services.timestreamquery.model.Row;
import software.amazon.awssdk.services.timestreamquery.model.Type;

import java.util.List;
import java.util.Objects;

@Repository
@RequiredArgsConstructor
@Slf4j
public class SampleRead {
    private final TimestreamQueryClient timestreamQueryClient;

    public void readSampleData() {
        final String query =
                """
                SELECT
                   sampleDimensionName
                FROM
                   "test_db"."test_table"
                """;

        final QueryRequest queryRequest = QueryRequest.builder().queryString(query).build();

        timestreamQueryClient
                .queryPaginator(queryRequest)
                .stream()
                .forEach(queryResponse -> this.parseQueryResult(queryResponse));
    }

    /**
     * Timestreamからのレスポンスをパースする
     *
     * @param response Timestreamからのレスポンス
     */
    private void parseQueryResult(QueryResponse response) {
        final List<ColumnInfo> columnInfo = response.columnInfo();

        response.rows().forEach(row -> this.parseRow(columnInfo, row));
    }

    /**
     * 各行をレスポンスモデルに変換
     *
     * @param columnInfo カラムのメタデータ
     * @param row 行データ
     */
    private void parseRow(List<ColumnInfo> columnInfo, Row row) {
        final List<Datum> data = row.data();

        for (int j = 0; j < data.size(); j++) {
            final Datum datum = data.get(j);
            final ColumnInfo info = columnInfo.get(j);

            if (Objects.isNull(info.name())) {
                throw new RuntimeException();
            }

            final String stringValue = this.parseDatum(info, datum);

            if ("sampleDimensionName".equals(info.name())) {
                log.info(stringValue);
            }
        }
    }

    /**
     * 各データをパースする
     *
     * @param info メタデータ
     * @param datum データ本体
     * @return パース結果の文字列
     */
    private String parseDatum(ColumnInfo info, Datum datum) {
        // NOTE: 今回は読み込むデータがScalarValueしかありえなかったのでこうしているが、他の型も来る場合はこのあたりを変更する

        if (Objects.nonNull(datum.nullValue()) && datum.nullValue()) {
            throw new RuntimeException();
        }

        Type columnType = info.type();
        if (Objects.nonNull(columnType.timeSeriesMeasureValueColumnInfo())) {
            throw new RuntimeException();
        }

        if (Objects.nonNull(columnType.arrayColumnInfo())) {
            throw new RuntimeException();
        }

        if (Objects.nonNull(columnType.rowColumnInfo()) && columnType.rowColumnInfo().size() > 0) {
            throw new RuntimeException();
        }

        return datum.scalarValue();
    }
}

これで、実際にデータを読み込むことが出来ました!

Timestreamから読み込んだデータ

最後に

データの読み込みのパースは少し複雑でしたが、それ以外は非常に簡単にデータの読み書きが出来たことがわかると思います。

書き込みがヘビーなサービスであれば使い所も出てくると思いますので、もしそういったサービスを扱っている場合は、ぜひ検討してみてください!

参考

docs.aws.amazon.com