Amazon DynamoDB テーブルからストリーミングデータをロードする - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon DynamoDB テーブルからストリーミングデータをロードする

AWS Lambda を使用して、Amazon DynamoDB から OpenSearch Service ドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
DynamoDB テーブル

このテーブルにはソースデータが含まれています。詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB テーブルの基本的なオペレーション」を参照してください。

このテーブルは、OpenSearch Service ドメインと同じリージョンに存在している必要があり、ストリームが [新しいイメージ] に設定されている必要があります。詳細については、「ストリームの有効化」を参照してください。

OpenSearch Service ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch サービスドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように OpenSearch Service、DynamoDB、Lambda の基本的な実行許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージを作成する」の手順に従いますが、ddb-to-opensearch という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

まだ持っていない場合、pip をインストールしてから、次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

次に、「Lambda 関数を作成する」の手順に従いますが、「前提条件」の IAM ロールと、トリガーの以下の設定を指定します。

  • [テーブル]: DynamoDB テーブル

  • [バッチサイズ]: 100

  • [開始位置]: 水平トリム

詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB Streams と Lambda を用いた新しい項目の処理」を参照してください。

この時点で、すべてのリソースが揃いました (ソースデータの DynamoDB テーブル、テーブルに対する DynamoDB ストリームの変更、ソースデータが変更されてそれらの変更のインデックスを作成した後に実行される関数、検索および可視化のための OpenSearch Service ドメイン)。

Lambda 関数をテストする

関数を作成した後、AWS CLI を使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }