Uso de Apache Kafka como objetivo para AWS Database Migration Service - AWS Database Migration Service

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de Apache Kafka como objetivo para AWS Database Migration Service

Puede usarlo AWS DMS para migrar datos a un clúster de Apache Kafka. Apache Kafka es una plataforma de streaming distribuida. Puede utilizar Apache Kafka para ingerir y procesar datos de streaming en tiempo real.

AWS también ofrece Amazon Managed Streaming para que Apache Kafka MSK (Amazon) lo utilice como objetivo. AWS DMS Amazon MSK es un servicio de streaming de Apache Kafka totalmente gestionado que simplifica la implementación y la gestión de las instancias de Apache Kafka. Funciona con versiones de código abierto de Apache Kafka y se accede a las MSK instancias de Amazon como AWS DMS destinos exactamente igual que a cualquier instancia de Apache Kafka. Para obtener más información, consulta ¿Qué es AmazonMSK? en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Un clúster de Kafka almacena flujos de registros en categorías denominadas temas que se dividen en particiones. Las particiones son secuencias de registros de datos (mensajes) identificados de forma única en un tema. Las particiones se pueden distribuir entre varios agentes de un clúster para permitir el procesamiento paralelo de los registros de un tema. Para obtener más información sobre temas y particiones y su distribución en Apache Kafka, consulte Temas y registros y Distribución.

El clúster de Kafka puede ser una MSK instancia de Amazon, un clúster que se ejecute en una EC2 instancia de Amazon o un clúster local. Una MSK instancia de Amazon o un clúster de una EC2 instancia de Amazon pueden estar en la misma instancia VPC o en una diferente. Si el clúster está en las instalaciones, puede usar su propio servidor de nombres en las instalaciones para la instancia de replicación a fin de resolver el nombre de host del clúster. Para obtener información acerca de cómo configurar un servidor de nombres para la instancia de replicación, consulte Uso de su propio servidor de nombres en las instalaciones. Para obtener más información sobre cómo configurar una red, consulte Configuración de una red para una instancia de replicación.

Cuando utilice un MSK clúster de Amazon, asegúrese de que su grupo de seguridad permita el acceso desde la instancia de replicación. Para obtener información sobre cómo cambiar el grupo de seguridad de un MSK clúster de Amazon, consulte Cambiar el grupo de seguridad de un MSK clúster de Amazon.

AWS Database Migration Service publica registros sobre un tema de Kafka utilizandoJSON. Durante la conversión, AWS DMS serializa cada registro de la base de datos de origen en un par atributo-valor en formato. JSON

Para migrar los datos desde cualquier origen de datos admitido a un clúster de Kafka de destino, se usa la asignación de objetos. Con la asignación de objetos, se determina cómo se estructuran los registros de datos en el tema de destino. También debe definir una clave de partición para cada tabla, que Apache Kafka utiliza para agrupar los datos en particiones.

Actualmente, AWS DMS admite un solo tema por tarea. En el caso de una sola tarea con varias tablas, todos los mensajes van a un solo tema. Cada mensaje incluye una sección de metadatos que identifica el esquema y la tabla de destino. AWS DMS las versiones 3.4.6 y superiores admiten la replicación multitema mediante el mapeo de objetos. Para obtener más información, consulte Replicación multitemática mediante asignación de objetos.

Configuración de punto de enlace de Apache Kafka

Puede especificar los detalles de la conexión mediante la configuración del punto final de la AWS DMS consola o mediante la --kafka-settings opción de. CLI A continuación se indican los requisitos para cada ajuste:

  • Broker: especifique las ubicaciones de uno o más agentes en el clúster de Kafka en forma de una lista separada por comas de cada broker-hostname:port. Un ejemplo es "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Esta configuración puede especificar las ubicaciones de algunos o todos los agentes del clúster. Todos los agentes de clúster se comunican para gestionar la partición de los registros de datos migrados al tema.

  • Topic: (opcional) especifique el nombre del tema con una longitud máxima de 255 letras y símbolos. Puede usar el punto (.), el guion bajo (_) y el signo menos (-). Los nombres de temas con un punto (.) o guion bajo (_) pueden colisionar en las estructuras de datos internas. Puede usar uno de estos símbolos, pero no ambos, en el nombre del tema. Si no especifica un nombre para el tema, "kafka-default-topic" lo AWS DMS utiliza como tema de migración.

    nota

    Para AWS DMS crear un tema de migración que especifique o el tema predeterminado, auto.create.topics.enable = true configúrelo como parte de la configuración del clúster de Kafka. Para obtener más información, consulte Limitaciones al utilizar Apache Kafka como objetivo para AWS Database Migration Service

  • MessageFormat: el formato del resultado de los registros creados en el punto de conexión. El formato del mensaje es JSON (predeterminado) o JSON_UNFORMATTED (una sola línea sin tabulación).

  • MessageMaxBytes: el tamaño máximo en bytes de los registros creados en el punto de conexión. El valor predeterminado es 1 000 000.

    nota

    Solo puede usar AWS CLI/para cambiar SDK MessageMaxBytes a un valor que no sea el predeterminado. Por ejemplo, para modificar el punto de conexión de Kafka existente y cambiar MessageMaxBytes, utilice el siguiente comando.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: proporciona información detallada sobre transacciones de la base de datos de origen. Esta información incluye una marca temporal de confirmación, una posición de registro y valores para transaction_id, previous_transaction_id y transaction_record_id (el desplazamiento del registro dentro de una transacción). El valor predeterminado es false.

  • IncludePartitionValue: muestra el valor de partición dentro de la salida del mensaje de Kafka, a menos que el tipo de partición sea schema-table-type. El valor predeterminado es false.

  • PartitionIncludeSchemaTable: agrega los nombres de los esquemas y de las tablas como prefijo a los valores de partición, cuando el tipo de partición es primary-key-type. Al hacerlo, aumenta la distribución de datos entre las particiones de Kafka. Por ejemplo, supongamos que un esquema SysBench tiene miles de tablas y cada tabla tiene un rango limitado para una clave principal. En este caso, la misma clave principal se envía desde miles de tablas a la misma partición, lo que provoca limitación. El valor predeterminado es false.

  • IncludeTableAlterOperations— Incluye cualquier operación del lenguaje de definición de datos (DDL) que cambie la tabla en los datos de controlrename-table, comodrop-table, add-columndrop-column, yrename-column. El valor predeterminado es false.

  • IncludeControlDetails: muestra información detallada de control para la definición de tablas, la definición de columnas y los cambios de tablas y columnas en la salida del mensaje de Kafka. El valor predeterminado es false.

  • IncludeNullAndEmpty— Incluye NULL y vacía columnas en el destino. El valor predeterminado es false.

  • SecurityProtocol— Establece una conexión segura con un punto final de destino de Kafka mediante Transport Layer Security (TLS). Las opciones incluyen ssl-authentication, ssl-encryption y sasl-ssl. El uso de sasl-ssl requiere SaslUsername y SaslPassword.

  • SslEndpointIdentificationAlgorithm: establece la verificación del nombre de host para el certificado. Esta configuración se admite en AWS DMS versión 3.5.1 y posteriores. Estas son las opciones disponibles:

    • NONE: deshabilita la verificación del nombre de host del agente en la conexión del cliente.

    • HTTPS: habilita la verificación del nombre de host del agente en la conexión del cliente.

  • useLargeIntegerValue— Utilice int de hasta 18 dígitos en lugar de convertir ints como dobles, disponible a partir de la AWS DMS versión 3.5.4. El valor predeterminado es false.

Puede usar la configuración para ayudar a aumentar la velocidad de la transferencia. Para ello, AWS DMS admite la carga completa con varios subprocesos en un clúster de destino de Apache Kafka. AWS DMS admite esta operación con varios subprocesos con las configuraciones de tareas que incluyen lo siguiente:

  • MaxFullLoadSubTasks— Utilice esta opción para indicar el número máximo de tablas de origen que se van a cargar en paralelo. AWS DMS carga cada tabla en su tabla de destino de Kafka correspondiente mediante una subtarea dedicada. El valor predeterminado es 8, el valor máximo es 49.

  • ParallelLoadThreads— Utilice esta opción para especificar el número de subprocesos que se AWS DMS utilizan para cargar cada tabla en su tabla de destino de Kafka. El valor máximo para un destino de Apache Kafka es 32. Puede pedir que se incremente este límite máximo.

  • ParallelLoadBufferSize: utilice esta opción para especificar el número máximo de registros para almacenar en el búfer que los subprocesos de carga en paralelo utilizan para cargar datos en el destino de Kafka. El valor predeterminado es 50. El valor máximo es 1000. Utilice este parámetro con ParallelLoadThreads. ParallelLoadBufferSize es válido solo cuando hay más de un subproceso.

  • ParallelLoadQueuesPerThread: utilice esta opción para especificar el número de colas que acceden a cada subproceso simultáneo para eliminar los registros de datos de las colas y generar una carga por lotes para el destino. El valor predeterminado de es 1. El máximo es 512.

Puede mejorar el rendimiento de la captura de datos de cambios (CDC) para los puntos finales de Kafka ajustando la configuración de las tareas para los subprocesos paralelos y las operaciones masivas. Para ello, puede especificar el número de subprocesos simultáneos, las colas por subproceso y el número de registros que se van a almacenar en un búfer mediante la configuración de tareas ParallelApply*. Por ejemplo, supongamos que desea realizar una CDC carga y aplicar 128 hilos en paralelo. También desea acceder a 64 colas por subproceso, con 50 registros almacenados por búfer.

Para mejorar el CDC rendimiento, AWS DMS es compatible con las siguientes configuraciones de tareas:

  • ParallelApplyThreads— Especifica el número de subprocesos simultáneos que se AWS DMS utilizan durante una CDC carga para enviar los registros de datos a un punto final de Kafka. El valor predeterminado es cero (0) y el valor máximo es 32.

  • ParallelApplyBufferSize— Especifica el número máximo de registros que se deben almacenar en cada cola de búfer para que los subprocesos simultáneos se envíen a un punto final de destino de Kafka durante una carga. CDC El valor predeterminado es 100 y el máximo es 1000. Utilice esta opción cuando ParallelApplyThreads especifique más de un subproceso.

  • ParallelApplyQueuesPerThread— Especifica el número de colas a las que accede cada subproceso para extraer los registros de datos de las colas y generar una carga por lotes para un punto final de Kafka. CDC El valor predeterminado de es 1. El máximo es 512.

Cuando se utiliza la configuración de tareas ParallelApply*, el valor predeterminado de partition-key-type es el valor de primary-key de la tabla, no el valor de schema-name.table-name.

Conexión a Kafka mediante Transport Layer Security () TLS

Un clúster de Kafka acepta conexiones seguras mediante Transport Layer Security ()TLS. ConDMS, puede utilizar cualquiera de las siguientes tres opciones de protocolos de seguridad para proteger una conexión de punto final de Kafka.

SSLcifrado () server-encryption

Los clientes validan la identidad del servidor mediante el certificado del servidor. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

SSLautenticación (mutual-authentication)

El servidor y el cliente validan la identidad entre sí mediante sus propios certificados. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

SASL-SSL (mutual-authentication)

El método de capa de seguridad y autenticación simple (SASL) reemplaza el certificado del cliente por un nombre de usuario y una contraseña para validar la identidad del cliente. En concreto, debe proporcionar un nombre de usuario y una contraseña que el servidor haya registrado para que el servidor pueda validar la identidad de un cliente. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

importante

Apache Kafka y Amazon MSK aceptan certificados resueltos. Esta es una limitación conocida de Kafka y Amazon MSK que hay que abordar. Para obtener más información, consulte Problemas con Apache Kafka, KAFKA -3700.

Si utilizas AmazonMSK, considera la posibilidad de utilizar listas de control de acceso (ACLs) como solución alternativa a esta limitación conocida. Para obtener más información sobre su usoACLs, consulte la ACLs sección Apache Kafka de la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Si utiliza un clúster de Kafka autoadministrado, consulte Comentario del 21 de octubre de 2018 para obtener información sobre la configuración del clúster.

Uso del SSL cifrado con Amazon MSK o un clúster Kafka autogestionado

Puedes usar el SSL cifrado para proteger una conexión de punto final a Amazon MSK o a un clúster de Kafka autogestionado. Al utilizar el método de autenticación SSL cifrada, los clientes validan la identidad de un servidor mediante el certificado del servidor. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

Para usar el SSL cifrado para conectarse a Amazon MSK
  • Establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción ssl-encryption al crear el punto de conexión de Kafka de destino.

    El JSON ejemplo siguiente establece el protocolo de seguridad como SSL cifrado.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Para usar el SSL cifrado en un clúster de Kafka autogestionado
  1. Si utilizas una autoridad de certificación (CA) privada en tu clúster de Kafka local, carga tu certificado de CA privado y obtén un nombre de recurso de Amazon (). ARN

  2. Establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción ssl-encryption al crear el punto de conexión de Kafka de destino. En el siguiente JSON ejemplo, se establece el protocolo de seguridad como. ssl-encryption

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Si utilizas una CA privada, configura SslCaCertificateArn la ARN que obtuviste en el primer paso anterior.

Uso de la autenticación de SSL

Puedes usar la SSL autenticación para proteger una conexión de punto final a Amazon MSK o a un clúster de Kafka autogestionado.

Para habilitar la autenticación y el cifrado de clientes mediante la SSL autenticación para conectarse a AmazonMSK, haga lo siguiente:

  • Prepare una clave privada y un certificado público para Kafka.

  • Cargue los certificados al administrador de DMS certificados.

  • Cree un punto final de destino de Kafka con el certificado correspondiente ARNs especificado en la configuración del punto final de Kafka.

Para preparar una clave privada y un certificado público para Amazon MSK
  1. Cree una EC2 instancia y configure un cliente para que utilice la autenticación tal y como se describe en los pasos 1 a 9 de la sección Autenticación de clientes de la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

    Tras completar estos pasos, dispondrá de un certificado ARN (el certificado público ARN guardado en élACM) y de una clave privada en un kafka.client.keystore.jks archivo.

  2. Obtenga el certificado público y cópielo en el archivo signed-certificate-from-acm.pem mediante el siguiente comando:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    El comando devuelve información similar al siguiente ejemplo:

    {"Certificate": "123", "CertificateChain": "456"}

    A continuación, copie el equivalente de "123" al archivo signed-certificate-from-acm.pem.

  3. Obtenga la clave privada importando la clave msk-rsa desde kafka.client.keystore.jks to keystore.p12, como se muestra en el siguiente ejemplo.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilice el siguiente comando para exportar keystore.p12 a un formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Aparece PEMel mensaje Introduzca la contraseña e identifica la clave que se aplica para cifrar el certificado.

  5. Elimine los atributos de bolsa y los atributos de clave del archivo .pem para asegurarse de que la primera línea comience con la siguiente cadena.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Para cargar un certificado público y una clave privada al administrador de DMS certificados y probar la conexión con Amazon MSK
  1. Cárguelo al administrador de DMS certificados mediante el siguiente comando.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Cree un punto final de MSK destino de Amazon y pruebe la conexión para asegurarse de que la TLS autenticación funciona.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
importante

Puede utilizar la SSL autenticación para proteger una conexión a un clúster de Kafka autogestionado. En algunos casos, es posible que use una entidad de certificación (CA) privada en el clúster de Kafka en las instalaciones. Si es así, cargue la cadena de CA, el certificado público y la clave privada en el administrador de DMS certificados. A continuación, utilice el nombre de recurso de Amazon correspondiente (ARN) en la configuración de su punto de conexión al crear su punto de enlace de destino Kafka local.

Preparación de una clave privada y un certificado firmado para un clúster de Kafka autoadministrado
  1. Genere un par de claves como se muestra en el siguiente ejemplo.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Genere una solicitud de firma de certificado ()CSR.

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Utilice la CA del almacén de confianza de su clúster para firmar elCSR. Si no tiene una entidad de certificación, puede crear su propia entidad de certificación privada.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importe ca-cert en el almacén de confianza y al almacén de claves del servidor. Si no dispone de un almacén de confianza, utilice el siguiente comando para crear el almacén de confianza e importar ca-cert en él.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Firme el certificado.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importe el certificado firmado al almacén de claves.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilice el siguiente comando para importar la clave on-premise-rsa de kafka.server.keystore.jks a keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilice el siguiente comando para exportar keystore.p12 a un formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Cargue encrypted-private-server-key.pem y signed-certificate.pem envíelo ca-cert al administrador de DMS certificados.

  10. Cree un punto final utilizando lo devueltoARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Uso deSASL: SSL autenticación para conectarse a Amazon MSK

El método de capa de seguridad y autenticación simple (SASL) utiliza un nombre de usuario y una contraseña para validar la identidad del cliente y establece una conexión cifrada entre el servidor y el cliente.

Para usarloSASL, primero debes crear un nombre de usuario y una contraseña seguros al configurar tu MSK clúster de Amazon. Para obtener una descripción de cómo configurar un nombre de usuario y una contraseña seguros para un MSK clúster de Amazon, consulte Configuración SASL o SCRAM autenticación para un MSK clúster de Amazon en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

A continuación, cuando cree el punto de conexión de destino de Kafka, establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción sasl-ssl. Establezca también las opciones SaslUsername y SaslPassword. Asegúrate de que sean coherentes con el nombre de usuario y la contraseña seguros que creaste al configurar tu MSK clúster de Amazon por primera vez, como se muestra en el siguiente JSON ejemplo.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
nota
  • Actualmente, solo AWS DMS es compatible con una CA pública respaldada por una CASASL:SSL. DMSno admiteSASL: SSL para su uso con Kafka autogestionado y respaldado por una CA privada.

  • Para la SASL SSL autenticación, AWS DMS admite el mecanismo SCRAM - SHA -512 de forma predeterminada. AWS DMS las versiones 3.5.0 y superiores también admiten el mecanismo Plain. Para admitir el mecanismo Plain, defina el SaslMechanism parámetro del tipo de KafkaSettings API datos en. PLAIN

Utilice una imagen anterior para ver los valores originales de CDC las filas de Apache Kafka como objetivo

Al escribir CDC actualizaciones en un destino de transmisión de datos como Kafka, puede ver los valores originales de la fila de la base de datos de origen antes de cambiarlos mediante una actualización. Para que esto sea posible, AWS DMS rellena una imagen anterior de los eventos de actualización en función de los datos proporcionados por el motor de base de datos de origen.

Los diferentes motores de base de datos de origen proporcionan diferentes cantidades de información para una imagen anterior:

  • Oracle proporciona actualizaciones a las columnas solo si cambian.

  • Postgre solo SQL proporciona datos para las columnas que forman parte de la clave principal (cambiadas o no). Si se utiliza la replicación lógica y REPLICA IDENTITY FULL está configurada para la tabla de origen, puede obtener toda la información del antes y el después de la fila escrita WALs y disponible aquí.

  • SQLPor lo general, My proporciona datos para todas las columnas (cambiadas o no).

Para habilitar imágenes anteriores para agregar valores originales de la base de datos de origen a la salida AWS DMS , utilice la configuración de tarea BeforeImageSettings o el parámetro add-before-image-columns. Este parámetro aplica una regla de transformación de columna.

BeforeImageSettingsagrega un nuevo JSON atributo a cada operación de actualización con valores recopilados del sistema de base de datos de origen, como se muestra a continuación.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
nota

Aplíquelo BeforeImageSettings a CDC las tareas de carga completa (que migran los datos existentes y replican los cambios en curso) o CDC solo a las tareas (que solo replican los cambios de datos). No se aplica BeforeImageSettings a tareas que son solo de carga completa.

Para las opciones de BeforeImageSettings, se aplica lo siguiente:

  • Establezca la opción EnableBeforeImage para habilitar true antes de crear imágenes. El valor predeterminado es false.

  • Utilice la FieldName opción para asignar un nombre al nuevo JSON atributo. Cuando EnableBeforeImage es true, FieldName es necesario y no puede estar vacío.

  • La opción ColumnFilter especifica una columna para agregar mediante el uso de las imágenes anteriores. Para agregar solo columnas que forman parte de las claves principales de la tabla, utilice el valor predeterminado, pk-only. Para añadir solo columnas que no sean de LOB tipo, utilicenon-lob. Para agregar cualquier columna que tenga un valor de imagen anterior, utilice all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Uso de una regla de transformación de imagen anterior

Como alternativa a la configuración de tareas, puede utilizar el parámetro add-before-image-columns, que aplica una regla de transformación de columnas. Con este parámetro, puede activarlo antes de crear imágenes CDC en destinos de transmisión de datos como Kafka.

Al utilizar add-before-image-columns en una regla de transformación, puede aplicar un control más detallado de los resultados de la imagen anterior. Las reglas de transformación permiten utilizar un localizador de objetos que le da control sobre las tablas seleccionadas para la regla. Además, puede encadenar reglas de transformación, lo que permite aplicar diferentes reglas a diferentes tablas. A continuación, puede manipular las columnas producidas utilizando otras reglas.

nota

No utilice el parámetro add-before-image-columns junto con la configuración de tarea BeforeImageSettings dentro de la misma tarea. En su lugar, utilice el parámetro o la configuración, pero no ambos, para una sola tarea.

Un tipo de regla transformation con el parámetro add-before-image-columns de una columna debe proporcionar una sección before-image-def. A continuación se muestra un ejemplo.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

El valor de column-prefix se antepone a un nombre de columna y el valor predeterminado de column-prefix es BI_. El valor de column-suffix se añade al nombre de la columna y el valor predeterminado está vacío. No configure ambas cadenas column-prefix y column-suffix como cadenas vacías.

Elija un valor para column-filter. Para agregar solo columnas que forman parte de las claves principales de la tabla, elija pk-only. Elija non-lob añadir solo columnas que no sean de LOB tipo. O elija all para agregar cualquier columna que tenga un valor de imagen anterior.

Ejemplo de una regla de transformación de imagen anterior

La regla de transformación del siguiente ejemplo agrega una nueva columna llamada BI_emp_no en el destino. Entonces, una instrucción como UPDATE employees SET emp_no = 3 WHERE emp_no = 1; rellena el campo BI_emp_no con 1. Cuando escribe CDC actualizaciones en los destinos de Amazon S3, la BI_emp_no columna permite saber qué fila original se actualizó.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Para obtener información sobre el uso de la acción de regla add-before-image-columns, consulte Reglas y acciones de transformación.

Limitaciones al utilizar Apache Kafka como objetivo para AWS Database Migration Service

Al utilizar Apache Kafka como destino, se aplican las siguientes restricciones:

  • AWS DMS Los puntos finales de destino de Kafka no admiten el control de IAM acceso para Amazon Managed Streaming for Apache Kafka (MSKAmazon).

  • No se admite LOB el modo completo.

  • Especifique un archivo de configuración de Kafka para su clúster con propiedades que AWS DMS permitan crear nuevos temas automáticamente. Incluya el valor auto.create.topics.enable = true. Si utilizas AmazonMSK, puedes especificar la configuración predeterminada al crear tu clúster de Kafka y, a continuación, cambiar la auto.create.topics.enable configuración atrue. Para obtener más información sobre los ajustes de configuración predeterminados, consulte La MSK configuración predeterminada de Amazon en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka. Si necesita modificar un clúster de Kafka existente creado con AmazonMSK, ejecute el AWS CLI comando aws kafka create-configuration para actualizar la configuración de Kafka, como en el siguiente ejemplo:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Aquí, //~/kafka_configuration es el archivo de configuración que ha creado con los valores de propiedades necesarios.

    Si utiliza su propia instancia de Kafka instalada en AmazonEC2, modifique la configuración del clúster de Kafka con la auto.create.topics.enable = true configuración que AWS DMS permita la creación automática de nuevos temas mediante las opciones que se proporcionan con la instancia.

  • AWS DMS publica cada actualización en un único registro de la base de datos de origen como un registro de datos (mensaje) en un tema de Kafka determinado, independientemente de las transacciones.

  • AWS DMS admite las dos formas siguientes de claves de partición:

    • SchemaName.TableName una combinación del nombre de esquema y de tabla.

    • ${AttributeName}: el valor de uno de los campos de la JSON tabla o la clave principal de la tabla en la base de datos de origen.

  • BatchApply no es compatible con un punto de conexión de Kafka. Es posible que el uso de la aplicación por lotes (por ejemplo, la configuración de tareas de metadatos de destino BatchApplyEnabled) para un objetivo de Kafka provoque la pérdida de datos.

  • AWS DMS no admite la migración de valores de tipos de BigInt datos con más de 16 dígitos. Para evitar esta limitación, puede usar la siguiente regla de transformación para convertir la columna BigInt en una cadena. Para obtener más información sobre las reglas de transformación, consulte Reglas y acciones de transformación.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Uso de la asignación de objetos para migrar datos a un tema de Kafka

AWS DMS utiliza reglas de mapeo de tablas para mapear datos del tema de Kafka de origen al de destino. Para asignar datos a un tema de destino, se utiliza un tipo de regla de asignación de tablas denominado asignación de objetos. Puede utilizar el mapeo de objetos para definir cómo los registros de datos del origen se asignan a los registros de datos publicados en un tema de Kafka.

Los temas de Kafka no tienen una estructura predeterminada distinta de una clave de partición.

nota

No tiene que utilizar la asignación de objetos. Puede utilizar la asignación de tablas normal para varias transformaciones. Sin embargo, el tipo de clave de partición seguirá estos comportamientos predeterminados:

  • La clave principal se usa como clave de partición para la carga completa.

  • Si no se utiliza ninguna configuración de tareas de aplicación paralela, schema.table se utiliza como clave de partición para. CDC

  • Si se utiliza la configuración de tareas de aplicación paralela, la clave principal se utiliza como clave de partición para. CDC

Para crear una regla de mapeo de objetos, se especifica rule-type como object-mapping. Esta regla indica el tipo de mapeo de objetos que desea utilizar.

La estructura de la regla es la siguiente.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS actualmente admite map-record-to-record y es map-record-to-document el único valor válido para el rule-action parámetro. Esta configuración afecta a los valores que no están excluidos como parte de la lista de atributos exclude-columns. Los map-record-to-document valores map-record-to-record y especifican cómo se AWS DMS gestionan estos registros de forma predeterminada. Estos valores no afectan a los mapeos de atributos en modo alguno.

Utilice map-record-to-record al migrar desde una base de datos relacional a un tema de Kafka. Este tipo de regla utiliza el valor taskResourceId.schemaName.tableName de la base de datos relacional como la clave de partición en el tema de Kafka y crea un atributo para cada columna de la base de datos de origen.

Cuando utilice map-record-to-record, tenga en cuenta lo siguiente:

  • Esta configuración solo afecta a las columnas excluidas de la lista exclude-columns.

  • Para cada columna de este tipo, AWS DMS crea un atributo correspondiente en el tema de destino.

  • AWS DMS crea el atributo correspondiente independientemente de si la columna de origen se utiliza en una asignación de atributos.

Una forma de entender map-record-to-record es verlo en acción. En este ejemplo, imagine que empieza con una fila de una tabla de base de datos relacional con la estructura y los datos siguientes.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Para migrar esta información desde un esquema denominado Test a un tema de Kafka, cree reglas para mapear los datos al tema. La siguiente regla ilustra la operación de asignación.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Dados un tema de Kafka y una clave de partición (en este caso, taskResourceId.schemaName.tableName), a continuación se ilustra el formato de registro resultante al usar nuestros datos de ejemplo en el tema de destino de Kafka:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Reestructuración de datos con el mapeo de atributos

Puede reestructurar los datos mientras los migra a un tema de Kafka utilizando un mapa de atributos. Por ejemplo, es posible que desee combinar varios campos del origen en un único campo en el destino. El mapa de atributos siguiente ilustra cómo reestructurar los datos.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Para establecer un valor constante para partition-key, especifique un valor de partition-key. Tal vez desee hacer esto, por ejemplo, para obligar a que todos los datos se almacenen en una única partición. El siguiente mapeo ilustra este enfoque.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
nota

El valor partition-key de un registro de control para una tabla específica es TaskId.SchemaName.TableName. El valor partition-key de un registro de control para una tabla específica es el TaskId de ese registro. La especificación de un valor partition-key en el mapeo de objetos no tiene ningún efecto en el elemento partition-key de un registro de control.

Replicación multitemática mediante asignación de objetos

De forma predeterminada, AWS DMS las tareas migran todos los datos de origen a uno de los siguientes temas de Kafka:

  • Como se especifica en el campo Tema del punto final de AWS DMS destino.

  • Como especifica kafka-default-topic si el campo Tema del punto de conexión de destino no está rellenado y la configuración auto.create.topics.enable de Kafka está establecida en true.

Con las versiones 3.4.6 y posteriores AWS DMS del motor, puede usar el kafka-target-topic atributo para asignar cada tabla fuente migrada a un tema diferente. Por ejemplo, las siguientes reglas de asignación de objetos migran las tablas de origen Customer y Address a los temas de Kafka customer_topic y address_topic, respectivamente. Al mismo tiempo, AWS DMS migra todas las demás tablas de origen, incluida la Bills tabla del Test esquema, al tema especificado en el punto final de destino.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Al utilizar la replicación multitema de Kafka, puede agrupar y migrar las tablas de origen para separar los temas de Kafka mediante una única tarea de replicación.

Formato de mensajes para Apache Kafka

El JSON resultado es simplemente una lista de pares clave-valor.

RecordType

El tipo de registro puede ser de datos o de control. Los registros de datos representan las filas reales en el origen. Los registros de control son para eventos importantes de la secuencia como, por ejemplo, el reinicio de una tarea.

Operación

Para los registros de datos, la operación puede ser load, insert, update o delete.

Para los registros de control, la operación puede ser create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column o column-type-change.

SchemaName

El esquema de origen del registro. Este campo puede estar vacío para un registro de control.

TableName

La tabla de origen del registro. Este campo puede estar vacío para un registro de control.

Timestamp

La marca de tiempo del momento en que se creó el JSON mensaje. El campo está formateado con el formato 8601. ISO

El siguiente ejemplo de JSON mensaje ilustra un mensaje de tipo de datos con todos los metadatos adicionales.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

El siguiente ejemplo de JSON mensaje ilustra un mensaje de tipo control.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }