使用 Amazon Kinesis Data Streams 作为目标 AWS Database Migration Service - AWS 数据库迁移服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon Kinesis Data Streams 作为目标 AWS Database Migration Service

您可以使用将数据迁移 AWS DMS 到 Amazon Kinesis 数据流。Amazon Kinesis 数据流是 Amazon Kinesis Data Streams 服务的一部分。可以使用 Kinesis 数据流实时收集和处理大型数据记录流。

Kinesis 数据流由分片组成。分片是流中数据记录的唯一标识序列。有关 Amazon Kinesis Data Streams 中的分片的更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的分片

AWS Database Migration Service 使用将记录发布到 Kinesis 数据流。JSON在转换过程中,将源数据库中的每条记录 AWS DMS 序列化为JSON格式或 JSON _ UNFORMATTED 消息格式的属性值对。JSON_ UNFORMATTED 消息格式是带有换行分隔符的单行JSON字符串。它允许 Amazon Data Firehose 将 Kinesis 数据传递到 Amazon S3 目标,然后使用各种查询引擎(包括 Amazon Athena)进行查询。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义分区键,Kinesis Data Streams 用它来将数据分组为分片。

在 Kinesis Data Streams 目标端点上 AWS DMS 创建表时,它创建的表与源数据库终端节点中创建的表数量一样多。 AWS DMS 还会设置多个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

注意

AWS DMS 控制台上的 “SSL模式” 选项或 “API不适用于某些数据流” 和 “无” SQL 服务,例如 Kinesis 和 DynamoDB。默认情况下,它们是安全的,因此 AWS DMS 显示SSL模式设置等于无(SSLMode=None)。您无需为终端节点提供任何其他配置即可使用SSL。例如,使用 Kinesis 作为目标端点时,默认情况下它是安全的。所有API对 Kinesis 的调用都使用SSL,因此无需在端点中添加其他SSL选项。 AWS DMS 您可以使用协议安全地放置数据并通过SSL端点检索数据,该HTTPS协议在连接到 Kinesis 数据流时默认 AWS DMS 使用。

Kinesis Data Streams 端点设置

当您使用 Kinesis Data Streams 目标端点时,您可以使用中的 AWS DMS API选项获取交易和控制细节。KinesisSettings

您可以使用以下方法之一设置连接:

  • 在 AWS DMS 控制台中,使用端点设置。

  • 在中CLI,使用CreateEndpoint命令的kinesis-settings选项。

在中CLI,使用该kinesis-settings选项的以下请求参数:

注意

AWS DMS 版本 3.4.1 及更高版本支持 IncludeNullAndEmpty 端点设置。但是,中支持Kinesis Data Streams目标的以下其他端点设置。 AWS DMS

  • MessageFormat – 在端点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • IncludeControlDetails – 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认为 false

  • IncludeNullAndEmpty— 在目标中包含列NULL并清空列。默认为 false

  • IncludePartitionValue – 在 Kinesis 消息输出中显示分区值,除非分区类型为 schema-table-type。默认为 false

  • IncludeTableAlterOperations— 包括更改控制数据中表的任何数据定义语言 (DDL) 操作,例如rename-tabledrop-tableadd-columndrop-column、和rename-column。默认为 false

  • IncludeTransactionDetails – 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id (事务内的记录偏移)的值。默认为 false

  • PartitionIncludeSchemaTable – 当分区类型为 primary-key-type 时,将架构和表名称作为分区值的前缀。这样做会增加 Kinesis 分片之间的数据分布。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在此况下,同一主键将从数千个表发送到同一个分片,这会导致限制。默认为 false

  • useLargeIntegerValue— 使用最多 18 位的 int,而不是将整数转换为双精度,从 3.5.4 AWS DMS 版本中可用。默认值为 false。

以下示例显示在使用 AWS CLI发出的示例 create-endpoint 命令中使用的 kinesis-settings 选项。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
多线程完全加载任务设置

为了帮助提高传输速度, AWS DMS 支持对 Kinesis Data Streams 目标实例进行多线程满载。 DMS支持这种多线程,其任务设置包括以下内容:

  • MaxFullLoadSubTasks— 使用此选项表示要并行加载的最大源表数。 DMS使用专用子任务将每个表加载到相应的 Kinesis 目标表中。默认值为 8;最大值为 49。

  • ParallelLoadThreads— 使用此选项指定用于将每个表加载到其 Kinesis 目标表的线程数。 AWS DMS Kinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize – 使用此选项指定在缓冲区(并行加载线程将数据加载到 Kinesis 目标时使用)中存储的最大记录数。默认值是 50。最大值为 1000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时 ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread – 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值是 1。但是,对于各种负载大小的 Kinesis 目标,有效范围为每个线程 5-512 个队列。

多线程CDC加载任务设置

您可以使用任务设置来修改呼叫行为,从而提高实时数据流目标端点(如 KinesisCDC)的更改数据捕获 () 的PutRecordsAPI性能。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行CDC加载并并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

为了提高CDC性能, AWS DMS 支持以下任务设置:

  • ParallelApplyThreads— 指定CDC加载期间 AWS DMS 用于将数据记录推送到 Kinesis 目标端点的并发线程数。默认值为零(0),最大值为 32。

  • ParallelApplyBufferSize— 指定每个缓冲队列中存储的最大记录数,以便在加载期间并发线程推送到 Kinesis 目标端点。CDC默认值是 100,最大值是 1,000。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread— 指定每个线程访问的队列数量,以便在队列中提取数据记录并在此期间为 Kinesis 端点生成批量加载。CDC默认值是 1,最大值是 512。

使用 ParallelApply* 任务设置时,partition-key-type 默认值是表的 primary-key,而不是 schema-name.table-name

使用之前的图像查看 Kinesis 数据流作为目标的CDC行的原始值

在向 Kinesis 等数据流目标写入CDC更新时,可以在更新更改之前查看源数据库行的原始值。为此,请根据源数据库引擎提供的数据 AWS DMS 填充更新事件之前的图像

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • Postgre 仅为属于主键的列SQL提供数据(无论是否已更改)。要为所有列提供数据(无论是否更改),您需要将 REPLICA_IDENTITY 设置为 FULL 而不是 DEFAULT。请注意,您应该仔细选择每张表的 REPLICA_IDENTITY 设置。如果设置REPLICA_IDENTITYFULL,则所有列值都将连续写入预写日志 (WAL)。这可能会导致经常更新的表出现性能或资源问题。

  • 我SQL通常会为除BLOB和CLOB数据类型(无论是否更改)之外的所有列提供数据。

要启用之前映像以便将源数据库中的原始值添加到 AWS DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings使用从源数据库系统收集的值向每个更新操作添加一个新JSON属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings适用于包含CDC组件的 AWS DMS 任务,例如满负荷加CDC任务(迁移现有数据并复制正在进行的更改),或者CDC仅适用于任务(仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认为 false

  • 使用FieldName选项为新JSON属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。要添加具有之前映像值的任何列,请使用 all。请注意,之前的图像不包含LOB数据类型的列,例如CLOB或BLOB。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注意

Amazon S3 目标不支持 BeforeImageSettings。对于 S3 目标,在映像期间仅使用要执行的add-before-image-columns转换规则CDC。

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在成像之前在数据流目标(如 Kinesis)CDC上启用。

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 add-before-image-columns 参数的 transformation 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值附加到列名称前面,column-prefix 的默认值为 BI_column-suffix 的值将附加到列名称之后,默认值为空。不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。要仅添加属于表主键一部分的列,请选择 pk-onlynon-lob选择仅添加非LOB类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您向 Amazon S3 目标写入CDC更新时,该BI_emp_no列可以判断哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作

使用 Kinesis 数据流作为目标的先决条件 AWS Database Migration Service

IAM用于使用 Kinesis 数据流作为目标的角色 AWS Database Migration Service

在将 Kinesis 数据流设置为目标之前 AWS DMS,请务必创建IAM角色。此角色必须允许 AWS DMS 代入并授予对正在迁移到的 Kinesis 数据流的访问权限。以下IAM策略中显示了最低访问权限集。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您在迁移到 Kinesis 数据流时使用的角色必须具有以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

访问 Kinesis 数据流作为目标 AWS Database Migration Service

在 3.4.7 及更高 AWS DMS 版本中,要连接到 Kinesis 终端节点,必须执行以下操作之一:

使用 Kinesis Data Streams 作为目标时的限制 AWS Database Migration Service

将 Kinesis Data Streams 作为目标时存在以下限制:

  • AWS DMS 无论事务如何,都会将每次更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的一条记录。但是,您可以使用的相关参数为每条数据记录添加交易详情KinesisSettingsAPI。

  • 不支持完整LOB模式。

  • 支持的最LOB大大小为 1 MB。

  • Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的处理重复记录

  • AWS DMS 支持以下两种形式的分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:源数据库中某个字段的值或表的主键。JSON

  • 有关在 Kinesis Data Streams 中加密静态数据的信息,请参阅《AWS Key Management Service 开发人员指南》中的 Kinesis Data Streams 中的数据保护

  • Kinesis 端点不支持 BatchApply。对 Kinesis 目标使用“批应用”(例如,BatchApplyEnabled 目标元数据任务设置)可能会导致数据丢失。

  • 只有与复制实例相同 AWS 账户的 Kinesis 数据流才支持 Kinesi AWS 区域 s 目标。

  • 从 “我的SQL源” 迁移时, BeforeImage 数据不包括CLOB和BLOB数据类型。有关更多信息,请参阅 使用之前的图像查看 Kinesis 数据流作为目标的CDC行的原始值

  • AWS DMS 不支持迁移超过 16 位BigInt数据类型的值。要解决此限制问题,您可以使用以下转换规则将 BigInt 列转换为字符串。有关转换规则的更多信息,请参阅 转换规则和操作

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

使用对象映射将数据迁移到 Kinesis 数据流

AWS DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

除了具有分区键以外,Kinesis 数据流没有预设结构。在对象映射规则中,数据记录的 partition-key-type 的可能值为 schema-tabletransaction-idprimary-keyconstantattribute-name

要创建对象映射规则,您应将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS 目前支持将map-record-to-recordmap-record-to-document作为该rule-action参数的唯一有效值。这些设置会影响未作为 exclude-columns 属性列表一部分排除的值。map-record-to-recordmap-record-to-document值指定默认情况下如何 AWS DMS 处理这些记录。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kinesis 数据流时使用 map-record-to-record。此规则类型使用关系数据库的 taskResourceId.schemaName.tableName 值作为 Kinesis 数据流中的分区键,并为源数据库中的每个列创建一个属性。

使用 map-record-to-record 时请注意:

  • 此设置仅影响 exclude-columns 列表排除的列。

  • 对于每个这样的列,在目标主题中 AWS DMS 创建一个相应的属性。

  • AWS DMS 无论源列是否用于属性映射,都会创建相应的属性。

通过使用 map-record-to-document,可使用属性名“_doc”将源列放入相应目标流中的单个平面文档中。 AWS DMS 将数据放入源上名为“_doc”的单个平面映射中。此放置应用于源表中的未在 exclude-columns 属性列表中列出的任何列。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Test 的架构迁移到 Kinesis 数据流,您将创建规则来将数据映射到目标流。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

以下内容说明 Kinesis 数据流中生成的记录格式。

  • StreamName: XXX

  • PartitionKey: 测试客户//。schmaName tableName

  • 数据://以下消息 JSON

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

但是,假设您使用相同的规则,但将 rule-action 参数更改为 map-record-to-document 并排除某些列。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

在这种情况下,exclude-columns 参数中未列出的列 FirstNameLastNameStoreIdDateOfBirth 将映射到 _doc。以下内容说明生成的记录格式。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kinesis 数据流时,您可以调整数据结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

Kinesis Data Streams 的消息格式

JSON输出只是一个键值对列表。JSON_ UNFORMATTED 消息格式是带有换行分隔符的单行JSON字符串。

AWS DMS 提供了以下保留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

操作

对于数据记录,操作可以是 loadinsertupdatedelete

对于控制记录,操作可以是 create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-columncolumn-type-change

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

Timestamp

构造JSON消息的时间戳。该字段的格式为 ISO 8601。