こんにちは。 エキサイト株式会社の三浦です。
AWSには、「Amazon Timestream」というサービスがあります。
時系列データを扱うのに非常に役立つサービスであり、Javaからも簡単に使えるようSDKが用意されています。
今回は、この「Amazon Timestream」をJavaでどうやれば使えるのか、説明していきます。
Amazon Timestreamとは
Amazon Timestream はAWSが提供しているサービスで、時系列的に取り扱いたいデータを格納し、そのデータをもとに分析を行うためのものです。
高速かつスケーラブルなサーバーレス時系列データベース
Amazon Timestream は、高速かつスケーラブルなサーバーレス時系列データベースサービスです。1 日あたり数兆件規模のイベントを最大 1,000 倍の速度でより簡単に保存および分析できます。Amazon Timestream は、容量とパフォーマンスを調整するために自動的にスケールアップまたはスケールダウンするので、基盤インフラストラクチャの管理が不要です。
マネージドサービスであるため開発者側は管理の手間を掛けることなく、ほとんど「使うこと」だけに注力することが出来ます。
使い方としては、例えば自社で取り扱っているIoT機器について、定期的にCPU使用率を計測したい、という要件があった場合に、「機器のID」と「計測時間」をキー、「CPU使用率」をバリューとしてTimestreamに定期的に投げる、というようにすることで、CPU使用率の時系列データを保存し、後から分析できるようになる、というようなものです。
もちろんIoT機器に限らず様々な場面で使用できます。
大量のアクセスを受け付け、大量の書き込みを行う必要がある、というような処理に使うのが良いでしょう。
さて、このTimestreamですが、使用するに当たって様々な言語のSDKを用意してくれています。
今回は、Javaで使うに当たっての使い方を紹介します。
JavaでTimestreamを使う方法
AWS側
まずはAWSのウェブコンソールなどで、TimestreamのDB・テーブルを作っておきます。
ここでは仮に以下のように作ります。
- DB: test_db
- Table: test_table
また、IAMロールに適切なポリシーも付けましょう。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowTask", "Effect": "Allow", "Action": [ "timestream:WriteRecords", // データの書き込みに必要 "timestream:Select", // データ読み込みに必要 "timestream:DescribeEndpoints", // 上記2つを使う際には必要 ], "Resource": "*" } ] }
AWS側の準備はこれで完了です。
Java側
書き込み
まずは、書き込みをやっていきます。
Gradleを使っている場合は、以下のライブラリを読み込んでください。
implementation platform('software.amazon.awssdk:bom:2.17.293') // バージョンは適宜変更 implementation('software.amazon.awssdk:timestreamwrite') implementation('software.amazon.awssdk:apache-client')
続いて、書き込み用クライアントを設定します。
package sample.config; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient; import java.time.Duration; @Configuration @RequiredArgsConstructor public class TimestreamConfig { /** * Timestream write clientを作成してBean化 * * @return Timestream write client */ @Bean public TimestreamWriteClient buildWriteClient() { ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder(); httpClientBuilder.maxConnections(5000); RetryPolicy.Builder retryPolicy = RetryPolicy.builder(); retryPolicy.numRetries(10); ClientOverrideConfiguration.Builder overrideConfig = ClientOverrideConfiguration.builder(); overrideConfig.apiCallAttemptTimeout(Duration.ofSeconds(20)); overrideConfig.retryPolicy(retryPolicy.build()); return TimestreamWriteClient.builder() .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig.build()) .region(Region.AP_NORTHEAST_1) .build(); } }
最後に、実際に書き込みます。
package sample.write; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient; import software.amazon.awssdk.services.timestreamwrite.model.Dimension; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValueType; import software.amazon.awssdk.services.timestreamwrite.model.Record; import software.amazon.awssdk.services.timestreamwrite.model.TimeUnit; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest; import java.time.ZonedDateTime; import java.util.List; @Repository @RequiredArgsConstructor public class SampleWrite { private static final Record COMMON_ATTRIBUTES = Record.builder() .measureValueType(MeasureValueType.VARCHAR) .measureName("sampleMeasureName") .measureValue("sampleMeasureValue") .version(1L) .build(); private final TimestreamWriteClient timestreamWriteClient; public void writeSampleData() { final List<Dimension> dimensionList = List.of( Dimension.builder().name("sampleDimensionName").value("sampleDimensionValue").build() ); final long currentTimestamp = ZonedDateTime.now().toEpochSecond(); final List<Record> recordList = List.of( Record.builder() .dimensions(dimensionList) .time(String.valueOf(currentTimestamp)) .timeUnit(TimeUnit.SECONDS) .build() ); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName("test_db") .tableName("test_table") .records(recordList) .commonAttributes(COMMON_ATTRIBUTES) .build(); timestreamWriteClient.writeRecords(writeRecordsRequest); } }
これで書き込むことが出来ました!
書き込み
続いて、今書き込んだデータを読み込んでみます。
Gradleを使っている場合は、以下のライブラリを読み込んでください。
implementation platform('software.amazon.awssdk:bom:2.17.293') // バージョンは適宜変更 implementation('software.amazon.awssdk:timestreamquery')
続いて、読み込み用クライアントを設定します。
package sample.config; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient; @Configuration @RequiredArgsConstructor public class TimestreamConfig { /** * Timestream query clientを作成してBean化 * * @return Timestream query client */ @Bean public TimestreamQueryClient buildQueryClient() { return TimestreamQueryClient.builder() .region(Region.AP_NORTHEAST_1) .build(); } }
最後に、実際に読み込みます。
package sample.read; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Repository; import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient; import software.amazon.awssdk.services.timestreamquery.model.ColumnInfo; import software.amazon.awssdk.services.timestreamquery.model.Datum; import software.amazon.awssdk.services.timestreamquery.model.QueryRequest; import software.amazon.awssdk.services.timestreamquery.model.QueryResponse; import software.amazon.awssdk.services.timestreamquery.model.Row; import software.amazon.awssdk.services.timestreamquery.model.Type; import java.util.List; import java.util.Objects; @Repository @RequiredArgsConstructor @Slf4j public class SampleRead { private final TimestreamQueryClient timestreamQueryClient; public void readSampleData() { final String query = """ SELECT sampleDimensionName FROM "test_db"."test_table" """; final QueryRequest queryRequest = QueryRequest.builder().queryString(query).build(); timestreamQueryClient .queryPaginator(queryRequest) .stream() .forEach(queryResponse -> this.parseQueryResult(queryResponse)); } /** * Timestreamからのレスポンスをパースする * * @param response Timestreamからのレスポンス */ private void parseQueryResult(QueryResponse response) { final List<ColumnInfo> columnInfo = response.columnInfo(); response.rows().forEach(row -> this.parseRow(columnInfo, row)); } /** * 各行をレスポンスモデルに変換 * * @param columnInfo カラムのメタデータ * @param row 行データ */ private void parseRow(List<ColumnInfo> columnInfo, Row row) { final List<Datum> data = row.data(); for (int j = 0; j < data.size(); j++) { final Datum datum = data.get(j); final ColumnInfo info = columnInfo.get(j); if (Objects.isNull(info.name())) { throw new RuntimeException(); } final String stringValue = this.parseDatum(info, datum); if ("sampleDimensionName".equals(info.name())) { log.info(stringValue); } } } /** * 各データをパースする * * @param info メタデータ * @param datum データ本体 * @return パース結果の文字列 */ private String parseDatum(ColumnInfo info, Datum datum) { // NOTE: 今回は読み込むデータがScalarValueしかありえなかったのでこうしているが、他の型も来る場合はこのあたりを変更する if (Objects.nonNull(datum.nullValue()) && datum.nullValue()) { throw new RuntimeException(); } Type columnType = info.type(); if (Objects.nonNull(columnType.timeSeriesMeasureValueColumnInfo())) { throw new RuntimeException(); } if (Objects.nonNull(columnType.arrayColumnInfo())) { throw new RuntimeException(); } if (Objects.nonNull(columnType.rowColumnInfo()) && columnType.rowColumnInfo().size() > 0) { throw new RuntimeException(); } return datum.scalarValue(); } }
これで、実際にデータを読み込むことが出来ました!
最後に
データの読み込みのパースは少し複雑でしたが、それ以外は非常に簡単にデータの読み書きが出来たことがわかると思います。
書き込みがヘビーなサービスであれば使い所も出てくると思いますので、もしそういったサービスを扱っている場合は、ぜひ検討してみてください!