Tutorial #2: Utilizzo dei filtri per elaborare alcuni eventi con DynamoDB e Lambda - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Tutorial #2: Utilizzo dei filtri per elaborare alcuni eventi con DynamoDB e Lambda

In questo tutorial, creerai un AWS Lambda trigger per elaborare solo alcuni eventi in un flusso da una tabella DynamoDB.

Con il filtro eventi Lambda è possibile utilizzare espressioni di filtro per controllare quali eventi Lambda invia alla funzione per l'elaborazione. Puoi configurare fino a 5 diversi filtri per i flussi DynamoDB. Se si utilizzano finestre di batch, Lambda applica i criteri di filtro a ogni nuovo evento per stabilire se aggiungerlo al batch corrente.

I filtri vengono applicati tramite strutture chiamate FilterCriteria. I 3 attributi principali di FilterCriteria sono metadata properties, data properties e filter patterns.

Ecco una struttura di esempio di un evento di flussi DynamoDB:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties sono i campi dell'oggetto evento. Nel caso di flussi DynamoDB, metadata properties sono campi come dynamodb o eventName.

data properties sono i campi del corpo dell'evento. Per filtrare su data properties, bisogna assicurarsi di contenerli in FilterCriteria all'interno della chiave appropriata. Per le origini eventi Dynamo DB, la chiave dati è NewImage o OldImage.

Infine, le regole di filtro definiranno l'espressione del filtro che si desidera applicare a una proprietà specifica. Ecco alcuni esempi:

Operatore di confronto Esempio Sintassi delle regole (parziale)

Null

Il tipo di prodotto è null

{ "product_type": { "S": null } }

Empty

Il nome del prodotto è vuoto

{ "product_name": { "S": [ ""] } }

Equals

Lo stato equivale a Florida

{ "state": { "S": ["FL"] } }

And

Lo stato del prodotto equivale a Florida e la categoria di prodotto è Chocolate

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Or

Lo stato del prodotto è Florida o California

{ "state": { "S": ["FL","CA"] } }

Not

Lo stato del prodotto non è Florida

{"state": {"S": [{"anything-but": ["FL"]}]}}

Exists

Esiste il prodotto artigianale

{"homemade": {"S": [{"exists": true}]}}

Does not exist

Il prodotto "homemade" non esiste

{"homemade": {"S": [{"exists": false}]}}

Begins with

PK inizia con COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Per una funzione Lambda è possibile specificare fino a 5 modelli di filtro eventi. Si noti che ognuno di questi 5 eventi verrà valutato come un OR logico. Quindi, se configuri due filtri denominati Filter_One e Filter_Two, la funzione Lambda eseguirà Filter_One OR Filter_Two.

Nota

Nella pagina di filtraggio degli eventi Lambda ci sono alcune opzioni per filtrare e confrontare valori numerici, tuttavia nel caso degli eventi di filtro DynamoDB ciò non si applica perché i numeri in DynamoDB vengono memorizzati come stringhe. Ad esempio "quantity": { "N": "50" }, sappiamo che è un numero a causa della proprietà "N".

Mettere tutto insieme - AWS CloudFormation

Per mostrare in pratica la funzionalità di filtraggio degli eventi, ecco un modello di esempio CloudFormation. Questo modello genererà una tabella DynamoDB semplice con una chiave di partizione PK e una chiave di ordinamento SK con flussi Amazon DynamoDB abilitati. Creerà una funzione Lambda e un semplice ruolo di esecuzione Lambda che consentirà di scrivere registri su Amazon Cloudwatch e leggere gli eventi dai flussi Amazon DynamoDB. Aggiungerà anche la mappatura dell'origine eventi tra flussi DynamoDB e la funzione Lambda, in modo che la funzione possa essere eseguita ogni volta che c'è un evento nei flussi Amazon DynamoDB.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Dopo aver distribuito questo modello di CloudFormation, puoi inserire il seguente elemento Amazon DynamoDB:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Grazie alla semplice funzione lambda inclusa in linea in questo modello di formazione cloud, vedrai gli eventi nei gruppi di CloudWatch log di Amazon per la funzione lambda come segue:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Esempi di filtri

  • Solo prodotti che corrispondono a un determinato stato

Questo esempio modifica il CloudFormation modello per includere un filtro per abbinare tutti i prodotti provenienti dalla Florida, con l'abbreviazione «FL».

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Dopo aver ridistribuito lo stack, puoi aggiungere il seguente elemento DynamoDB alla tabella. Si noti che non verrà visualizzato nei registri delle funzioni Lambda, poiché il prodotto in questo esempio proviene dalla California.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Solo gli elementi che iniziano con alcuni valori in PK e SK

Questo esempio modifica il CloudFormation modello per includere la seguente condizione:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Nota che la AND condizione richiede che la condizione sia all'interno del pattern, dove le chiavi PK e SK si trovano nella stessa espressione separate da una virgola.

O inizia con alcuni valori su PK e SK o proviene da un determinato stato.

Questo esempio modifica il CloudFormation modello per includere le seguenti condizioni:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Si noti che la condizione OR viene aggiunta introducendo nuovi modelli nella sezione del filtro.

Mettere tutto insieme - CDK

Il seguente modello di formazione CDK del progetto di esempio illustra la funzionalità di filtraggio degli eventi. Prima di lavorare con questo CDK progetto è necessario installare i prerequisiti, tra cui l'esecuzione degli script di preparazione.

Creare un progetto CDK

Per prima cosa crea un nuovo AWS CDK progetto, invocandolo cdk init in una directory vuota.

mkdir ddb_filters cd ddb_filters cdk init app --language python

Il comando cdk init utilizza il nome della cartella del progetto per denominare vari elementi del progetto, tra cui classi, sottocartelle e file. Tutti i trattini nel nome della cartella vengono convertiti in caratteri di sottolineatura. Altrimenti il nome dovrebbe seguire il formato di un identificatore Python. Ad esempio, non dovrebbe iniziare con un numero o contenere spazi.

Per lavorare con il nuovo progetto, attivare il suo ambiente virtuale. Ciò consente di installare le dipendenze del progetto localmente nella cartella del progetto, anziché globalmente.

source .venv/bin/activate python -m pip install -r requirements.txt
Nota

Potresti riconoscerlo come il comando Mac/Linux per attivare un ambiente virtuale. I modelli Python includono un file batch, source.bat, che consente di utilizzare lo stesso comando su Windows. Funziona anche il comando Windows tradizionale .venv\Scripts\activate.bat. Se hai inizializzato il tuo AWS CDK progetto utilizzando AWS CDK Toolkit v1.70.0 o versioni precedenti, il tuo ambiente virtuale si trova invece nella directory. .env .venv

Infrastruttura di base

Apri il file ./ddb_filters/ddb_filters_stack.py con l'editor di testo preferito. Questo file è stato generato automaticamente al momento della creazione del AWS CDK progetto.

Quindi, aggiungi le funzioni _create_ddb_table e _set_ddb_trigger_function. Queste funzioni creeranno una tabella DynamoDB con chiave di partizione PK e una chiave di ordinamento SK in modalità di assegnazione in modalità on-demand, con flussi Amazon DynamoDB abilitato per impostazione predefinita per mostrare immagini nuove e vecchie.

La funzione Lambda verrà archiviata nella cartella lambda nel file app.py. Questo file verrà creato in seguito. Comprenderà una variabile di ambiente APP_TABLE_NAME, che sarà il nome della tabella Amazon DynamoDB creata da questo stack. Nella stessa funzione concederemo alla funzione Lambda le autorizzazioni di lettura del flusso. Infine, verrà effettuata la sottoscrizione a flussi DynamoDB come origine degli eventi per la funzione Lambda.

Alla fine del file nel metodo __init__, richiamerai i rispettivi costrutti per inizializzarli nello stack. Per progetti più grandi che richiedono componenti e servizi aggiuntivi, potrebbe essere meglio definire questi costrutti al di fuori dello stack di base.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Ora creeremo una funzione lambda molto semplice che stamperà i log in Amazon. CloudWatch Per farlo, crea una nuova cartella denominata lambda.

mkdir lambda touch app.py

Usando l'editor di testo preferito, aggiungi il seguente contenuto al file app.py:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Assicurati di essere nella cartella /ddb_filters/, digita il seguente comando per creare l'applicazione di esempio:

cdk deploy

A un certo punto ti verrà chiesto di confermare se desideri implementare la soluzione. Accetta le modifiche digitando Y.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Una volta implementate le modifiche, apri la AWS console e aggiungi un elemento alla tabella.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

I CloudWatch log dovrebbero ora contenere tutte le informazioni di questa voce.

Esempi di filtri

  • Solo prodotti che corrispondono a un determinato stato

Apri il file ddb_filters/ddb_filters/ddb_filters_stack.py e modificalo per includere il filtro che corrisponde a tutti i prodotti equivalenti a "FL". Questo può essere modificato appena sotto event_subscription nella riga 45.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Solo gli elementi che iniziano con alcuni valori in PK e SK

Modifica lo script Python per includere la seguente condizione:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • O inizia con alcuni valori su PK e SK o proviene da un determinato stato.

Modifica lo script Python per includere le seguenti condizioni:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Si noti che la condizione OR viene aggiunta aggiungendo altri elementi all'array Filters.

Pulizia

Individua lo stack di filtri nella base della tua directory di lavoro ed esegui cdk destroy. Ti verrà chiesto di confermare l'eliminazione della risorsa:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y