Utiliser Apache Kafka comme cible pour AWS Database Migration Service - AWS Service de Migration de Base de Données

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser Apache Kafka comme cible pour AWS Database Migration Service

Vous pouvez l'utiliser AWS DMS pour migrer des données vers un cluster Apache Kafka. Apache Kafka est une plateforme de streaming distribuée. Apache Kafka vous permet d’ingérer et de traiter des données de streaming en temps réel.

AWS propose également Amazon Managed Streaming pour Apache Kafka for Apache MSK (Amazon) à utiliser comme AWS DMS cible. Amazon MSK est un service de streaming Apache Kafka entièrement géré qui simplifie la mise en œuvre et la gestion des instances Apache Kafka. Il fonctionne avec les versions open source d'Apache Kafka, et vous accédez aux MSK instances Amazon en tant que AWS DMS cibles, exactement comme n'importe quelle instance Apache Kafka. Pour plus d'informations, consultez Qu'est-ce qu'Amazon MSK ? dans le guide du développeur Amazon Managed Streaming for Apache Kafka.

Un cluster Kafka stocke les flux d’enregistrements dans des catégories appelées « rubriques », divisées en partitions. Les partitions sont des séquences identifiées de manière unique d’enregistrements de données (messages) dans une rubrique. Les partitions peuvent être réparties entre plusieurs agents dans un cluster pour permettre le traitement parallèle des enregistrements d’une rubrique. Pour de plus amples informations sur les rubriques et les partitions et leur distribution dans Apache Kafka, veuillez consulter Rubriques et journaux et distribution.

Votre cluster Kafka peut être une MSK instance Amazon, un cluster exécuté sur une EC2 instance Amazon ou un cluster sur site. Une MSK instance Amazon ou un cluster sur une EC2 instance Amazon peut se trouver dans la même instance VPC ou dans une autre instance. Dans le cas d’un cluster sur site, vous pouvez utiliser votre propre serveur de noms sur site pour votre instance de réplication afin de résoudre le nom d’hôte du cluster. Pour en savoir plus sur la configuration d’un serveur de noms pour votre instance de réplication, consultez Utilisation de votre propre serveur de noms sur site. Pour plus d’informations sur la configuration d’un réseau, consultez Configuration d'un réseau pour une instance de réplication.

Lorsque vous utilisez un MSK cluster Amazon, assurez-vous que son groupe de sécurité autorise l'accès depuis votre instance de réplication. Pour plus d'informations sur la modification du groupe de sécurité d'un MSK cluster Amazon, consultez Modifier le groupe de sécurité d'un MSK cluster Amazon.

AWS Database Migration Service publie des enregistrements sur un sujet Kafka en utilisantJSON. Au cours de la conversion, AWS DMS sérialise chaque enregistrement de la base de données source en une paire attribut-valeur au format. JSON

Utilisez le mappage d’objet pour migrer vos données de n’importe quelle source de données prise en charge vers un cluster Kafka cible. Avec le mappage d’objet, vous déterminez la façon de structurer les enregistrements de données dans la rubrique cible. Vous définissez également une clé de partition pour chaque table, qu'Apache Kafka utilise pour regrouper les données dans ses partitions.

Actuellement, ne AWS DMS prend en charge qu'un seul sujet par tâche. Pour une seule tâche comportant plusieurs tables, tous les messages sont placés dans une seule rubrique. Chaque message inclut une section de métadonnées qui identifie le schéma et la table cibles. AWS DMS les versions 3.4.6 et supérieures prennent en charge la réplication multisujet à l'aide du mappage d'objets. Pour de plus amples informations, veuillez consulter Réplication à plusieurs rubriques à l’aide du mappage d’objet.

Paramètres du point de terminaison Apache Kafka

Vous pouvez spécifier les détails de connexion par le biais des paramètres du point de terminaison dans la AWS DMS console ou de l'--kafka-settingsoption dans leCLI. Les conditions requises pour chaque paramètre sont les suivantes :

  • Broker : spécifiez les emplacements d’un ou de plusieurs agents dans votre cluster Kafka sous la forme d’une liste séparée par des virgules de tous les éléments broker-hostname:port. Par exemple : "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Ce paramètre peut spécifier les emplacements d’un ou de tous les agents du cluster. Les agents de cluster communiquent tous pour gérer le partitionnement des enregistrements de données migrés vers la rubrique.

  • Topic : (facultatif) spécifiez le nom de rubrique avec une longueur maximale de 255 lettres et symboles. Vous pouvez utiliser le point (.), le trait de soulignement (_) et le moins (-). Les noms de rubrique avec un point (.) ou un trait de soulignement (_) peuvent entrer en collision dans des structures de données internes. Utilisez l'un ou l'autre de ces symboles, mais pas les deux dans le nom de la rubrique. Si vous ne spécifiez pas de nom de rubrique, AWS DMS "kafka-default-topic" utilisez-le comme rubrique de migration.

    Note

    Pour AWS DMS créer soit un sujet de migration que vous spécifiez, soit le sujet par défaut, défini dans le auto.create.topics.enable = true cadre de la configuration de votre cluster Kafka. Pour plus d’informations, consultez Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service.

  • MessageFormat : format de sortie pour les enregistrements créés sur le point de terminaison. Le format du message est JSON (par défaut) ou JSON_UNFORMATTED (une seule ligne sans onglet).

  • MessageMaxBytes : taille maximale en octets des enregistrements créés sur le point de terminaison. La valeur par défaut est 1 000 000.

    Note

    Vous ne pouvez utiliser le AWS CLI/que SDK pour passer MessageMaxBytes à une valeur autre que celle par défaut. Par exemple, pour modifier votre point de terminaison Kafka existant et redéfinir la valeur de MessageMaxBytes, utilisez la commande suivante.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails : fournit des informations détaillées sur les transactions à partir de la base de données source. Ces informations comprennent un horodatage de validation, une position de journal et des valeurs pour transaction_id, previous_transaction_id et transaction_record_id(le décalage d'enregistrement dans une transaction). L’argument par défaut est false.

  • IncludePartitionValue : affiche la valeur de partition dans la sortie du message Kafka, sauf si le type de partition est schema-table-type. L’argument par défaut est false.

  • PartitionIncludeSchemaTable : préfixe les noms de schéma et de table aux valeurs de partition, lorsque le type de partition est primary-key-type. Cela augmente la distribution des données entre les partitions Kafka. Par exemple, supposons qu'un schéma SysBench comporte des milliers de tables et que chaque table n'ait qu'une plage limitée pour une clé primaire. Dans ce cas, la même clé primaire est envoyée à partir de milliers de tables vers la même partition, ce qui provoque une limitation. L’argument par défaut est false.

  • IncludeTableAlterOperations— Inclut toutes les opérations du langage de définition des données (DDL) qui modifient le tableau dans les données de contrôlerename-table, telles que drop-tableadd-column,,drop-column, etrename-column. L’argument par défaut est false.

  • IncludeControlDetails : affiche les informations de contrôle détaillées pour la définition de table, la définition de colonne et les modifications de table et de colonne dans la sortie du message Kafka. L’argument par défaut est false.

  • IncludeNullAndEmpty— Incluez NULL et videz des colonnes dans la cible. L’argument par défaut est false.

  • SecurityProtocol— Définit une connexion sécurisée à un point de terminaison cible Kafka à l'aide de Transport Layer Security (TLS). Les options sont ssl-authentication, ssl-encryption et sasl-ssl. L’utilisation de sasl-ssl requiert SaslUsername et SaslPassword.

  • SslEndpointIdentificationAlgorithm— Définit la vérification du nom d'hôte pour le certificat. Ce paramètre est pris en charge dans les AWS DMS versions 3.5.1 et ultérieures. Les options disponibles sont les suivantes :

    • NONE: désactive la vérification du nom d'hôte du broker dans la connexion client.

    • HTTPS: Activez la vérification du nom d'hôte du broker dans la connexion client.

  • useLargeIntegerValue— Utilisez jusqu'à 18 chiffres au lieu de convertir les entiers en doubles, disponible à partir de AWS DMS la version 3.5.4. La valeur par défaut est false.

Vous pouvez augmenter la vitesse du transfert dans les paramètres. Pour ce faire, AWS DMS prend en charge un chargement complet multithread sur un cluster cible Apache Kafka. AWS DMS prend en charge ce multithreading avec des paramètres de tâche qui incluent les éléments suivants :

  • MaxFullLoadSubTasks— Utilisez cette option pour indiquer le nombre maximum de tables sources à charger en parallèle. AWS DMS charge chaque table dans la table cible Kafka correspondante à l'aide d'une sous-tâche dédiée. La valeur par défaut est 8 ; la valeur maximale 49.

  • ParallelLoadThreads— Utilisez cette option pour spécifier le nombre de threads AWS DMS utilisés pour charger chaque table dans sa table cible Kafka. La valeur maximale pour une cible Apache Kafka est 32. Vous pouvez demander une augmentation de cette limite maximale.

  • ParallelLoadBufferSize : utilisez cette option pour spécifier le nombre maximal d’enregistrements à stocker dans la mémoire tampon utilisée par les threads de chargement parallèles pour charger les données dans la cible Kafka. La valeur par défaut est 50. La valeur maximale est 1 000. Utilisez ce paramètre avec ParallelLoadThreads. ParallelLoadBufferSize est valide uniquement dans le cas de plusieurs threads.

  • ParallelLoadQueuesPerThread : utilisez cette option pour spécifier le nombre de files d’attente auxquelles chaque thread simultané accède pour extraire les enregistrements de données des files d’attente et générer un chargement par lots pour la cible. La valeur par défaut est 1. La valeur maximale est 512.

Vous pouvez améliorer les performances de la capture des données de modification (CDC) pour les points de terminaison Kafka en ajustant les paramètres des tâches pour les threads parallèles et les opérations groupées. Pour ce faire, vous pouvez spécifier le nombre de threads simultanés, les files d'attente par thread et le nombre d'enregistrements à stocker dans un tampon à l'aide de la tâche ParallelApply*. Supposons, par exemple, que vous souhaitiez effectuer un CDC chargement et appliquer 128 threads en parallèle. Vous souhaitez également accéder à 64 files d'attente par thread, avec 50 enregistrements stockés par tampon.

Pour CDC améliorer les performances, AWS DMS prend en charge les paramètres de tâche suivants :

  • ParallelApplyThreads— Spécifie le nombre de threads simultanés AWS DMS utilisés lors d'un CDC chargement pour envoyer des enregistrements de données vers un point de terminaison cible Kafka. La valeur par défaut est zéro (0) et la valeur maximale est 32.

  • ParallelApplyBufferSize— Spécifie le nombre maximum d'enregistrements à stocker dans chaque file d'attente tampon pour les threads simultanés à envoyer vers un point de terminaison cible Kafka lors d'un CDC chargement. La valeur par défaut est 100 et la valeur maximale est 1 000. Utilisez cette option lorsque ParallelApplyThreads spécifie plusieurs threads.

  • ParallelApplyQueuesPerThread— Spécifie le nombre de files d'attente auxquelles chaque thread accède pour extraire des enregistrements de données des files d'attente et générer un chargement par lots pour un point de terminaison Kafka pendant cette période. CDC La valeur par défaut est 1. La valeur maximale est 512.

Lorsque vous utilisez les paramètres de tâche ParallelApply*, la valeur par défaut partition-key-type est la valeur primary-key de la table, pas schema-name.table-name.

Connexion à Kafka à l'aide de Transport Layer Security () TLS

Un cluster Kafka accepte les connexions sécurisées à l'aide de Transport Layer Security (TLS). AvecDMS, vous pouvez utiliser l'une des trois options de protocole de sécurité suivantes pour sécuriser une connexion à un terminal Kafka.

SSLchiffrement (server-encryption)

Les clients valident l’identité du serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.

SSLauthentification (mutual-authentication)

Le serveur et le client valident mutuellement leur identité respective via leurs propres certificats. Une connexion chiffrée est alors établie entre le serveur et le client.

SASL-SSL (mutual-authentication)

La méthode Simple Authentication and Security Layer (SASL) remplace le certificat du client par un nom d'utilisateur et un mot de passe pour valider l'identité du client. Plus précisément, vous fournissez un nom d’utilisateur et un mot de passe enregistrés par le serveur afin que ce dernier puisse valider l’identité d’un client. Une connexion chiffrée est alors établie entre le serveur et le client.

Important

Apache Kafka et Amazon MSK acceptent les certificats résolus. Il s'agit d'une limitation connue de Kafka et d'Amazon MSK à résoudre. Pour plus d'informations, consultez la section Problèmes liés à Apache Kafka, KAFKA -3700.

Si vous utilisez AmazonMSK, pensez à utiliser des listes de contrôle d'accès (ACLs) pour contourner cette limitation connue. Pour plus d'informations sur son utilisationACLs, consultez la ACLs section Apache Kafka du guide du développeur Amazon Managed Streaming for Apache Kafka.

Si vous utilisez un cluster Kafka autogéré, consultez Comment dated 21/Oct/18 pour en savoir plus sur la configuration de votre cluster.

Utilisation du SSL chiffrement avec Amazon MSK ou un cluster Kafka autogéré

Vous pouvez utiliser SSL le chiffrement pour sécuriser la connexion d'un terminal à Amazon MSK ou à un cluster Kafka autogéré. Lorsque vous utilisez la méthode d'authentification par SSL chiffrement, les clients valident l'identité d'un serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.

Pour utiliser SSL le chiffrement pour vous connecter à Amazon MSK
  • Définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option ssl-encryption lorsque vous créez votre point de terminaison Kafka cible.

    L'JSONexemple suivant définit le protocole de sécurité en tant que SSL cryptage.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Pour utiliser SSL le chiffrement pour un cluster Kafka autogéré
  1. Si vous utilisez une autorité de certification (CA) privée dans votre cluster Kafka sur site, téléchargez votre certificat CA privé et obtenez un nom de ressource Amazon (). ARN

  2. Définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option ssl-encryption lorsque vous créez votre point de terminaison Kafka cible. L'JSONexemple suivant définit le protocole de sécurité commessl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Si vous utilisez une autorité de certification privée, définissez SslCaCertificateArn le paramètre ARN que vous avez obtenu à la première étape ci-dessus.

Utilisation de l'SSLauthentification

Vous pouvez utiliser l'SSLauthentification pour sécuriser la connexion d'un terminal à Amazon MSK ou à un cluster Kafka autogéré.

Pour activer l'authentification et le chiffrement du client à l'aide de l'SSLauthentification pour se connecter à AmazonMSK, procédez comme suit :

  • Préparez une clé privée et un certificat public pour Kafka.

  • Téléchargez les certificats dans le gestionnaire de DMS certificats.

  • Créez un point de terminaison cible Kafka avec le certificat correspondant ARNs spécifié dans les paramètres du point de terminaison Kafka.

Pour préparer une clé privée et un certificat public pour Amazon MSK
  1. Créez une EC2 instance et configurez un client pour qu'il utilise l'authentification, comme décrit dans les étapes 1 à 9 de la section Authentification du client du manuel Amazon Managed Streaming for Apache Kafka Developer Guide.

    Une fois ces étapes terminées, vous disposez d'un certificat ARN (le certificat public ARN enregistré dansACM) et d'une clé privée contenus dans un kafka.client.keystore.jks fichier.

  2. Obtenez le certificat public et copiez-le dans le fichier signed-certificate-from-acm.pem à l’aide de la commande suivante :

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Cette commande renvoie des informations semblables à celles de l’exemple suivant :

    {"Certificate": "123", "CertificateChain": "456"}

    Vous copiez ensuite votre équivalent de "123" dans le fichier signed-certificate-from-acm.pem.

  3. Pour obtenir la clé privée, importez la clé msk-rsa à partir de kafka.client.keystore.jks to keystore.p12, comme illustré dans l’exemple suivant.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilisez la commande suivante pour exporter keystore.p12 au format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Le message Enter PEM Passphrase apparaît et identifie la clé appliquée pour chiffrer le certificat.

  5. Supprimez les attributs de conteneur et les attributs de clé du fichier .pem pour vous assurer que la première ligne commence par la chaîne suivante.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Pour télécharger un certificat public et une clé privée dans le gestionnaire de DMS certificats et tester la connexion à Amazon MSK
  1. Téléchargez vers le gestionnaire de DMS certificats à l'aide de la commande suivante.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Créez un point de terminaison MSK cible Amazon et testez la connexion pour vous assurer que TLS l'authentification fonctionne.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Important

Vous pouvez utiliser l'SSLauthentification pour sécuriser une connexion à un cluster Kafka autogéré. Dans certains cas, vous devrez peut-être utiliser une autorité de certification (CA) privée dans votre cluster Kafka sur site. Si tel est le cas, téléchargez votre chaîne d'autorité de certification, votre certificat public et votre clé privée dans le gestionnaire de DMS certificats. Utilisez ensuite le nom de ressource Amazon correspondant (ARN) dans les paramètres de votre point de terminaison lorsque vous créez votre point de terminaison cible Kafka sur site.

Pour préparer une clé privée et un certificat signé pour un cluster Kafka autogéré
  1. Générez une paire de clés comme indiqué dans l’exemple suivant.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Générez une demande de signature de certificat (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Utilisez l'autorité de certification de votre cluster pour signer leCSR. Si vous n’avez pas de CA, vous pouvez créer votre propre CA privée.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importez ca-cert dans le magasin de clés de confiance et le magasin de clés du serveur. Si vous n’avez pas de magasin de clés de confiance, utilisez la commande suivante pour le créer et y importer ca-cert .

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Signez le certificat.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importez le certificat signé dans le magasin de clés.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilisez la commande suivante pour importer la clé on-premise-rsa de kafka.server.keystore.jks dans keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilisez la commande suivante pour exporter keystore.p12 au format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Téléchargez encrypted-private-server-key.pemsigned-certificate.pem, et ca-cert vers le gestionnaire DMS de certificats.

  10. Créez un point de terminaison en utilisant le renvoyéARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Utilisation SASL de SSL l'authentification pour se connecter à Amazon MSK

La méthode Simple Authentication and Security Layer (SASL) utilise un nom d'utilisateur et un mot de passe pour valider l'identité d'un client et établit une connexion cryptée entre le serveur et le client.

Pour l'utiliserSASL, vous devez d'abord créer un nom d'utilisateur et un mot de passe sécurisés lorsque vous configurez votre MSK cluster Amazon. Pour savoir comment configurer un nom d'utilisateur et un mot de passe sécurisés pour un MSK cluster Amazon, consultez la section Configuration SASL et SCRAM authentification pour un MSK cluster Amazon dans le guide du développeur Amazon Managed Streaming for Apache Kafka.

Ensuite, lorsque vous créez votre point de terminaison cible Kafka, définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option sasl-ssl. Vous définissez également les options SaslUsername et SaslPassword. Assurez-vous qu'ils correspondent au nom d'utilisateur et au mot de passe sécurisés que vous avez créés lors de la première configuration de votre MSK cluster Amazon, comme illustré dans l'JSONexemple suivant.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Note
  • Actuellement, ne AWS DMS prend en charge que le support public soutenu par une autorité de certification SASL -SSL. DMSne prend pas en chargeSASL, SSL à utiliser avec Kafka autogéré soutenu par une autorité de certification privée.

  • Pour SSL l'authentification SASL -, AWS DMS prend en charge le mécanisme SCRAM - SHA -512 par défaut. AWS DMS les versions 3.5.0 et supérieures prennent également en charge le mécanisme Plain. Pour prendre en charge le mécanisme Plain, définissez le SaslMechanism paramètre du type de KafkaSettings API données surPLAIN.

Utilisation d'une image précédente pour afficher les valeurs originales des CDC lignes pour Apache Kafka en tant que cible

Lorsque vous rédigez CDC des mises à jour sur une cible de diffusion de données telle que Kafka, vous pouvez afficher les valeurs d'origine d'une ligne de base de données source avant de les modifier par une mise à jour. Pour ce faire, AWS DMS remplit une image antérieure des événements de mise à jour en fonction des données fournies par le moteur de base de données source.

Différents moteurs de base de données source fournissent différentes quantités d'informations pour une image antérieure :

  • Oracle met uniquement à jour des colonnes si elles changent.

  • Postgre SQL fournit uniquement des données pour les colonnes qui font partie de la clé primaire (modifiées ou non). Si la réplication logique est utilisée et REPLICA IDENTITY FULL est définie pour la table source, vous pouvez obtenir des informations complètes avant et après sur la ligne écrite dans le WALs et disponibles ici.

  • My fournit SQL généralement des données pour toutes les colonnes (modifiées ou non).

Pour activer avant l'imagerie pour ajouter des valeurs d'origine de la base de données source à la sortie AWS DMS , utilisez le paramètre de tâche BeforeImageSettings ou le paramètre add-before-image-columns. Ce paramètre applique une règle de transformation de colonne.

BeforeImageSettingsajoute un nouvel JSON attribut à chaque opération de mise à jour avec les valeurs collectées dans le système de base de données source, comme indiqué ci-dessous.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Note

S'applique BeforeImageSettings aux CDC tâches Full Load Plus (qui migrent les données existantes et répliquent les modifications en cours), ou CDC uniquement aux tâches (qui répliquent uniquement les modifications de données). N’appliquez pas les BeforeImageSettings aux tâches à pleine charge uniquement.

Pour les options BeforeImageSettings, les conditions suivantes s'appliquent :

  • Définissez l'option EnableBeforeImage sur true pour activer la génération d’image antérieure. L’argument par défaut est false.

  • Utilisez FieldName cette option pour attribuer un nom au nouvel JSON attribut. Quand EnableBeforeImage est true, FieldName est obligatoire et ne peut pas être vide.

  • L'option ColumnFilter spécifie une colonne à ajouter en utilisant la génération d’image antérieure. Pour ajouter uniquement des colonnes faisant partie des clés primaires de la table, utilisez la valeur par défaut, pk-only. Pour ajouter uniquement des colonnes qui ne sont pas de LOB type, utiliseznon-lob. Pour ajouter une colonne ayant une valeur d'image antérieure, utilisez all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Utilisation d'une règle de transformation d'image antérieure

Au lieu des paramètres de tâche, vous pouvez utiliser le paramètre add-before-image-columns, qui applique une règle de transformation de colonne. Avec ce paramètre, vous pouvez l'activer avant de créer des images CDC sur des cibles de diffusion de données telles que Kafka.

En utilisant add-before-image-columns dans une règle de transformation, vous pouvez exercer un contrôle plus précis sur les résultats de l'image antérieure. Les règles de transformation vous permettent d'utiliser un localisateur d'objets qui vous fournit un contrôle sur les tables sélectionnées pour la règle. En outre, vous pouvez enchaîner les règles de transformation, ce qui permet d'appliquer différentes règles à différentes tables. Vous pouvez ensuite manipuler les colonnes produites à l'aide d'autres règles.

Note

N'utilisez pas le paramètre add-before-image-columns avec le paramètre de tâche BeforeImageSettings dans la même tâche. N’utilisez pas les deux, pour une seule tâche.

Un type de règle transformation avec le paramètre add-before-image-columns d'une colonne doit fournir une section before-image-def. Vous en trouverez un exemple ci-dessous.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

La valeur de column-prefix est ajoutée à un nom de colonne et la valeur par défaut de column-prefix est BI_. La valeur de column-suffix est ajoutée au nom de la colonne et la valeur par défaut est vide. Ne définissez pas les deux column-prefix et column-suffix sur des chaînes vides.

Choisissez une valeur pour column-filter. Pour ajouter uniquement les colonnes qui font partie des clés primaires de la table, choisissez pk-only . Choisissez non-lob de n'ajouter que des colonnes qui ne sont pas de LOB type. Ou choisissez all d'ajouter une colonne qui a une valeur d’image antérieure.

Exemple de règle de transformation d'image antérieure

La règle de transformation de l'exemple suivant ajoute une nouvelle colonne appelée BI_emp_no dans la cible. Ainsi, une instruction comme UPDATE employees SET emp_no = 3 WHERE emp_no = 1; remplit le champ BI_emp_no avec 1. Lorsque vous rédigez CDC des mises à jour pour les cibles Amazon S3, la BI_emp_no colonne permet de savoir quelle ligne d'origine a été mise à jour.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Pour plus d'informations sur l'utilisation de l'action de règle add-before-image-columns, consultez Règles et actions de transformation.

Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service

Les limitations suivantes s'appliquent lorsque vous utilisez apache Kafka comme cible :

  • AWS DMS Les points de terminaison cibles de Kafka ne prennent pas en charge le contrôle IAM d'accès pour Amazon Managed Streaming for Apache Kafka (MSKAmazon).

  • LOBLe mode complet n'est pas pris en charge.

  • Spécifiez un fichier de configuration Kafka pour votre cluster avec des propriétés permettant AWS DMS de créer automatiquement de nouveaux sujets. Incluez le paramètre, auto.create.topics.enable = true. Si vous utilisez AmazonMSK, vous pouvez spécifier la configuration par défaut lorsque vous créez votre cluster Kafka, puis modifier le auto.create.topics.enable paramètre sur. true Pour plus d'informations sur les paramètres de configuration par défaut, consultez la section La MSK configuration par défaut d'Amazon Managed Streaming for Apache Kafka Developer Guide. Si vous devez modifier un cluster Kafka existant créé à l'aide d'AmazonMSK, exécutez la AWS CLI commande aws kafka create-configuration pour mettre à jour votre configuration Kafka, comme dans l'exemple suivant :

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Ici, //~/kafka_configuration est le fichier de configuration que vous avez créé avec les paramètres de propriété requis.

    Si vous utilisez votre propre instance Kafka installée sur AmazonEC2, modifiez la configuration du cluster Kafka avec le auto.create.topics.enable = true paramètre permettant de AWS DMS créer automatiquement de nouveaux sujets, en utilisant les options fournies avec votre instance.

  • AWS DMS publie chaque mise à jour d'un seul enregistrement de la base de données source sous la forme d'un enregistrement de données (message) dans un sujet Kafka donné, quelles que soient les transactions.

  • AWS DMS prend en charge les deux formes suivantes pour les clés de partition :

    • SchemaName.TableName : une combinaison du nom du schéma et du nom de la table.

    • ${AttributeName}: valeur de l'un des champs de la JSON table ou clé primaire de celle-ci dans la base de données source.

  • BatchApply n’est pas pris en charge pour un point de terminaison Kafka. L’utilisation de l’application par lots (par exemple, le paramètre de tâche de métadonnées cible BatchApplyEnabled) pour une cible Kafka peut entraîner une perte de données.

  • AWS DMS ne prend pas en charge la migration de valeurs de type de BigInt données comportant plus de 16 chiffres. Pour contourner cette limitation, vous pouvez utiliser la règle de transformation suivante pour convertir la colonne BigInt en chaîne. Pour plus d’informations sur les règles de transformation, consultez Règles et actions de transformation.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilisation du mappage d’objet pour migrer les données vers une rubrique Kafka

AWS DMS utilise des règles de mappage de tables pour mapper les données de la source au sujet Kafka cible. Pour mapper des données à une rubrique cible, vous utilisez un type de règle de mappage de table appelé « mappage d’objet ». Vous utilisez le mappage d'objet pour définir la façon dont les enregistrements de données de la source sont mappés sur les enregistrements de données publiés dans une rubrique Kafka.

Les rubriques Kafka ne disposent pas d'une structure prédéfinie autre que le fait d'avoir une clé de partition.

Note

Vous n’avez pas besoin d’utiliser le mappage d’objet. Vous pouvez utiliser un mappage de table standard pour différentes transformations. Cependant, le type de clé de partition suivra les comportements par défaut suivants :

  • La clé primaire est utilisée comme clé de partition pour le chargement complet.

  • Si aucun paramètre de tâche à application parallèle n'est utilisé, schema.table il est utilisé comme clé de partition pour. CDC

  • Si des paramètres de tâche à application parallèle sont utilisés, la clé primaire est utilisée comme clé de partition pour. CDC

Pour créer une règle de mappage d'objet, spécifiez rule-type comme object-mapping. Cette règle spécifie le type de mappage d'objet que vous souhaitez utiliser.

La structure de la règle est la suivante.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS prend actuellement en charge map-record-to-record et map-record-to-document en tant que seules valeurs valides pour le rule-action paramètre. Ces paramètres affectent les valeurs qui ne sont pas exclues de la liste d’attributs exclude-columns. Les map-record-to-document valeurs map-record-to-record et indiquent comment ces AWS DMS enregistrements sont gérés par défaut. Ces valeurs n'affectent en aucune façon les mappages d'attributs.

Utilisez map-record-to-record lors d'une migration d'une base de données relationnelle vers une rubrique Kafka. Ce type de règle utilise la valeur taskResourceId.schemaName.tableName de la base de données relationnelle comme clé de partition dans la rubrique Kafka, et crée un attribut pour chaque colonne dans la base de données source.

Lorsque vous utilisez map-record-to-record, notez ce qui suit :

  • Ce paramètre n’affecte que les colonnes exclues par la liste exclude-columns.

  • Pour chacune de ces colonnes, AWS DMS crée un attribut correspondant dans le sujet cible.

  • AWS DMS crée cet attribut correspondant, que la colonne source soit utilisée ou non dans un mappage d'attributs.

Une manière de comprendre map-record-to-record est de le voir en action. Dans cet exemple, imaginons que vous commencez avec une ligne de table d'une base de données relationnelle, présentant la structure et les données suivantes :

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Pour migrer ces informations à partir d'un schéma nommé Test vers une rubrique Kafka, vous créez des règles pour mapper les données sur la rubrique cible. La règle suivante illustre ce mappage.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Compte tenu d’une rubrique Kafka et d’une clé de partition (dans ce cas, taskResourceId.schemaName.tableName), l’exemple ci-dessous illustre le format d’enregistrement résultant à l’aide de nos exemples de données dans la rubrique cible Kafka :

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Restructuration de données avec le mappage d'attribut

Vous pouvez restructurer les données lors de leur migration vers une rubrique Kafka à l'aide d'un mappage d'attribut. Par exemple, vous pourriez vouloir regrouper plusieurs champs de la source en un seul champ dans la cible. Le mappage d'attribut suivant illustre comment restructurer les données.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Pour définir une valeur constante pour partition-key, spécifiez une valeur partition-key. Par exemple, vous pouvez le faire pour forcer le stockage de toutes les données dans une seule partition. Le mappage suivant illustre cette approche.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Note

La valeur partition-key d'un enregistrement de contrôle correspondant à une table spécifique est TaskId.SchemaName.TableName. La valeur partition-key d'un enregistrement de contrôle correspondant à une tâche spécifique est le TaskId de cet enregistrement. La spécification d'une valeur partition-key dans le mappage d'objet n'a aucun impact sur la partition-key d'un enregistrement de contrôle.

Réplication à plusieurs rubriques à l’aide du mappage d’objet

Par défaut, les AWS DMS tâches migrent toutes les données sources vers l'une des rubriques Kafka suivantes :

  • Comme indiqué dans le champ Rubrique du point de terminaison AWS DMS cible.

  • Comme indiqué par kafka-default-topic si le champ Rubrique du point de terminaison cible n’est pas renseigné et que le paramètre auto.create.topics.enable Kafka est défini sur true.

Avec les versions 3.4.6 et supérieures AWS DMS du moteur, vous pouvez utiliser l'kafka-target-topicattribut pour associer chaque table source migrée à une rubrique distincte. Par exemple, les règles de mappage d’objet suivantes migrent les tables sources Customer et Address vers les rubriques Kafka customer_topic et address_topic, respectivement. Dans le même temps, AWS DMS migre toutes les autres tables sources, y compris la Bills table du Test schéma, vers le sujet spécifié dans le point de terminaison cible.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

En utilisant la réplication à plusieurs rubriques Kafka, vous pouvez regrouper et migrer les tables sources afin de séparer les rubriques Kafka à l’aide d’une seule tâche de réplication.

Format de message pour Apache Kafka

Le JSON résultat est simplement une liste de paires clé-valeur.

RecordType

Les enregistrements peuvent être de type Données ou Contrôle. Les enregistrements de données représentent les lignes réelles de la source. Les enregistrements de contrôle sont destinés à des événements importants dans le flux, par exemple un redémarrage de la tâche.

Opération

Pour les enregistrements de données, l'opération peut être load,insert, update ou delete.

Pour les enregistrements de contrôle, l’opération peut être create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column ou column-type-change.

SchemaName

Schéma source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.

TableName

Table source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.

Horodatage

L'horodatage du moment où le JSON message a été créé. Le champ est formaté au format ISO 8601.

L'exemple de JSON message suivant illustre un message de type de données avec toutes les métadonnées supplémentaires.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

L'exemple de JSON message suivant illustre un message de type contrôle.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }