Lambda 如何處理來自 Amazon Kinesis Data Streams 的記錄 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Lambda 如何處理來自 Amazon Kinesis Data Streams 的記錄

您可以使用 Lambda 函數來處理 Amazon Kinesis 資料串流中的記錄。您可以將 Lambda 函數對應至 Kinesis Data Streams 共用輸送量取用程式 (標準迭代程式),或對應至具有增強散發功能的專用輸送量取用程式。對於標準迭代程式,Lambda 會使用 HTTP 協定輪詢 Kinesis 串流中的每個碎片以尋找記錄。事件來源映射會與碎片的其他取用者共用讀取傳輸量。

如需 Kinesis 資料串流的詳細資訊,請參閱從 Amazon Kinesis Data Streams 讀取資料

注意

Kinesis 會收取每個碎片的費用,以及從串流讀取資料的增強型散發。如需定價的詳細資訊,請參閱 Amazon Kinesis 定價

輪詢和批次處理串流

Lambda 會從資料串流讀取記錄並透過包含串流記錄的事件同步調用函數。Lambda 會讀取批次中的記錄並調用函數,以處理來自該批次的記錄。每個批次包含來自單一碎片/資料串流的記錄。

對於標準 Kinesis Data Streams,Lambda 會輪詢串流中的碎片中的記錄,速率為每個碎片每秒一次。對於 Kinesis 增強型散發,Lambda 會使用 HTTP/2 連線偵聽從 Kinesis 推送的記錄。當記錄可用時,Lambda 會調用您的函數,並等待結果。

Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄,Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數,您可設定批次間隔,請求事件來源緩衝記錄最長達五分鐘。調用函數之前,Lambda 會繼續從事件來源中讀取記錄,直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需詳細資訊,請參閱批次處理行為

警告

Lambda 事件來源映射至少會處理每個事件一次,而且可能會重複處理記錄。為避免與重複事件相關的潛在問題,強烈建議您讓函數程式碼具有等冪性。如需詳細資訊,請參閱 AWS 知識中心中的如何讓 Lambda 函數具有冪等性

Lambda 在傳送下一批進行處理前不會等待任何已設定的擴充完成。換句話說,當 Lambda 處理下一批記錄時,您的擴充功能可能會繼續執行。如果您違反任何帳戶的 並行 設定或限制,便可能會產生限流的問題。若要偵測此是否為潛在問題,請監控您的函數,並確認您看到的 並行指標 是否高於事件來源映射的預期值。由於兩次調用之間的時間很短,Lambda 可能會短暫報告比碎片數目更高的並行用量。即使對於沒有延伸項目的 Lambda 函數也可能如此。

設定 ParallelizationFactor 設定,同時使用多個 Lambda 調用來處理 Kinesis 資料串流的一個碎片。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。例如,當 ParallelizationFactor 設定為 2 時,您最多可以有 200 個並行 Lambda 調用,來處理 100 個 Kinesis 資料碎片 (不過在實務中,ConcurrentExecutions 指標可能有不同值)。當資料量急劇波動並且 IteratorAge 高時,這有助於縱向擴展處理輸送量。如果增加每個碎片的並行批次數量,Lambda 仍會確保在分割區索引鍵層級進行順序處理。

您也可以搭配 Kinesis 彙總使用 ParallelizationFactor。事件來源映射的行為取決於您是否使用增強型散發

  • 沒有使用增強型散發:彙總事件內的所有事件都必須具有相同的分割區索引鍵。分割區索引鍵也必須與彙總事件的分割區索引鍵相符。如果彙總事件內的事件具有不同的分割區索引鍵,則 Lambda 無法保證依分割區索引鍵依序處理事件。

  • 使用增強型散發:首先,Lambda 會將彙總的事件解碼為其個別事件。彙總的事件可以具有與其包含的事件不同的分割區索引鍵。不過,未對應至分割區索引鍵的事件會遭到捨棄和遺失。Lambda 不會處理這些事件,也不會將其傳送至設定的失敗目的地。

範例事件

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }