Cómo Lambda procesa los registros de Amazon Kinesis Data Streams
Puede utilizar una función de Lambda para procesar los registros de un flujo de datos de Amazon Kinesis. Puede asignar una función de Lambda a un consumidor de rendimiento compartido (iterador estándar) de Kinesis Data Streams o a un consumidor de rendimiento dedicado con distribución ramificada mejorada. Para iteradores estándar, Lambda sondea cada partición de la secuencia de Kinesis en busca de registros utilizando el protocolo HTTP. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.
Para obtener información detallada sobre los flujos de datos de Kinesis, consulte Lectura de datos de Amazon Kinesis Data Streams.
nota
Kinesis cobra por cada partición y, para una distribución ramificada mejorada, por los datos leídos desde el flujo. Para obtener más información sobre precios, consulte Precios de Amazon Kinesis
Temas
- Flujos de sondeo y procesamiento por lotes
- Evento de ejemplo
- Procesamiento de registros de Amazon Kinesis Data Streams
- Configuración de la respuesta por lotes parcial con Kinesis Data Streams y Lambda
- Conserve los registros de lotes descartados para unorigen de eventos de Kinesis Data Streams en Lambda
- Implementación del procesamiento con estado de Kinesis Data Streams en Lambda
- Parámetros de Lambda para las asignaciones de orígenes de eventos de Amazon Kinesis Data Streams
- Uso del filtrado de eventos con una fuente de eventos de Kinesis
- Tutorial: Uso de Lambda con Kinesis Data Streams
Flujos de sondeo y procesamiento por lotes
Lambda lee los registros del flujo de datos e invoca la función sincrónicamente con un evento que contiene registros de flujo. Lambda lee los registros por lotes e invoca la función para procesar los registros del lote. Cada lote contiene registros de una única partición o flujo de datos.
Para los flujos de datos estándar de Kinesis, Lambda sondea cada partición en el flujo para buscar registros una vez por segundo. En el caso de la distribución mejorada de Kinesis, Lambda utiliza una conexión HTTP/2 para escuchar los registros que se envían desde Kinesis. Cuando hay registros disponibles, Lambda invoca la función y espera el resultado.
De forma predeterminada, Lambda invoca su función tan pronto como los registros estén disponibles. Si el lote que Lambda lee del origen de eventos solo tiene un registro, Lambda envía solo un registro a la función. Para evitar invocar la función con un número de registros pequeño, puede indicar al origen de eventos que almacene en búfer registros durante hasta 5 minutos configurando un plazo de procesamiento por lotes. Antes de invocar la función, Lambda continúa leyendo los registros del origen de eventos hasta que haya recopilado un lote completo, venza el plazo de procesamiento por lotes o el lote alcance el límite de carga de 6 MB. Para obtener más información, consulte Comportamiento de procesamiento por lotes.
aviso
Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez, y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte ¿Cómo puedo hacer que mi función de Lambda sea idempotente?
Lambda no espera a que se completen las extensiones configuradas antes de enviar el siguiente lote para su procesamiento. En otras palabras, las extensiones pueden seguir ejecutándose mientras Lambda procesa el siguiente lote de registros. Esto puede provocar problemas de limitación si infringe alguno de los ajustes o límites de simultaneidad de la cuenta. Para detectar si se trata de un posible problema, supervise sus funciones y compruebe si ve métricas de simultaneidad más elevadas de lo esperado para la asignación de orígenes de eventos. Debido a los tiempos cortos entre invocaciones, Lambda puede informar brevemente un uso de simultaneidad superior al número de particiones. Esto puede ser cierto incluso para las funciones de Lambda sin extensiones.
Defina la configuración ParallelizationFactor para procesar una partición de un flujo de datos de Kinesis con más de una invocación de Lambda simultáneamente. Puede especificar el número de lotes simultáneos que Lambda sondea desde una partición a través de un factor de paralelización de 1 (predeterminado) a 10. Por ejemplo, cuando establece ParallelizationFactor
en 2, puede tener un máximo de 200 invocaciones de Lambda simultáneas para procesar 100 particiones de datos de Kinesis (aunque, en la práctica, es posible que observe diferentes valores para la métrica ConcurrentExecutions
). Esto ayuda a escalar verticalmente el rendimiento de procesamiento cuando el volumen de datos es volátil y el IteratorAge
es alto. Cuando aumenta el número de lotes simultáneos por partición, Lambda sigue garantizando el procesamiento en orden a nivel de clave de partición.
También puede utilizar ParallelizationFactor
con la agregación de Kinesis. El comportamiento de la asignación de orígenes de eventos depende de si utiliza la distribución ramificada mejorada:
-
Sin distribución ramificada mejorada: todos los eventos incluidos en un evento agregado deben tener la misma clave de partición. La clave de partición también debe coincidir con la del evento agregado. Si los eventos incluidos en el evento agregado tienen claves de partición diferentes, Lambda no puede garantizar el procesamiento de los eventos ordenados por clave de partición.
-
Con distribución ramificada mejorada: en primer lugar, Lambda decodifica el evento agregado en sus eventos individuales. El evento agregado puede tener una clave de partición diferente a la de los eventos que contiene. Sin embargo, los eventos que no se corresponden con la clave de partición se eliminan y se pierden
. Lambda no procesa estos eventos ni los envía a un destino de error configurado.
Evento de ejemplo
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }