Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utiliser Apache Kafka comme cible pour AWS Database Migration Service
Vous pouvez l'utiliser AWS DMS pour migrer des données vers un cluster Apache Kafka. Apache Kafka est une plateforme de streaming distribuée. Apache Kafka vous permet d’ingérer et de traiter des données de streaming en temps réel.
AWS propose également Amazon Managed Streaming pour Apache Kafka for Apache MSK (Amazon) à utiliser comme AWS DMS cible. Amazon MSK est un service de streaming Apache Kafka entièrement géré qui simplifie la mise en œuvre et la gestion des instances Apache Kafka. Il fonctionne avec les versions open source d'Apache Kafka, et vous accédez aux MSK instances Amazon en tant que AWS DMS cibles, exactement comme n'importe quelle instance Apache Kafka. Pour plus d'informations, consultez Qu'est-ce qu'Amazon MSK ? dans le guide du développeur Amazon Managed Streaming for Apache Kafka.
Un cluster Kafka stocke les flux d’enregistrements dans des catégories appelées « rubriques », divisées en partitions. Les partitions sont des séquences identifiées de manière unique d’enregistrements de données (messages) dans une rubrique. Les partitions peuvent être réparties entre plusieurs agents dans un cluster pour permettre le traitement parallèle des enregistrements d’une rubrique. Pour de plus amples informations sur les rubriques et les partitions et leur distribution dans Apache Kafka, veuillez consulter Rubriques et journaux
Votre cluster Kafka peut être une MSK instance Amazon, un cluster exécuté sur une EC2 instance Amazon ou un cluster sur site. Une MSK instance Amazon ou un cluster sur une EC2 instance Amazon peut se trouver dans la même instance VPC ou dans une autre instance. Dans le cas d’un cluster sur site, vous pouvez utiliser votre propre serveur de noms sur site pour votre instance de réplication afin de résoudre le nom d’hôte du cluster. Pour en savoir plus sur la configuration d’un serveur de noms pour votre instance de réplication, consultez Utilisation de votre propre serveur de noms sur site. Pour plus d’informations sur la configuration d’un réseau, consultez Configuration d'un réseau pour une instance de réplication.
Lorsque vous utilisez un MSK cluster Amazon, assurez-vous que son groupe de sécurité autorise l'accès depuis votre instance de réplication. Pour plus d'informations sur la modification du groupe de sécurité d'un MSK cluster Amazon, consultez Modifier le groupe de sécurité d'un MSK cluster Amazon.
AWS Database Migration Service publie des enregistrements sur un sujet Kafka en utilisantJSON. Au cours de la conversion, AWS DMS sérialise chaque enregistrement de la base de données source en une paire attribut-valeur au format. JSON
Utilisez le mappage d’objet pour migrer vos données de n’importe quelle source de données prise en charge vers un cluster Kafka cible. Avec le mappage d’objet, vous déterminez la façon de structurer les enregistrements de données dans la rubrique cible. Vous définissez également une clé de partition pour chaque table, qu'Apache Kafka utilise pour regrouper les données dans ses partitions.
Actuellement, ne AWS DMS prend en charge qu'un seul sujet par tâche. Pour une seule tâche comportant plusieurs tables, tous les messages sont placés dans une seule rubrique. Chaque message inclut une section de métadonnées qui identifie le schéma et la table cibles. AWS DMS les versions 3.4.6 et supérieures prennent en charge la réplication multisujet à l'aide du mappage d'objets. Pour de plus amples informations, veuillez consulter Réplication à plusieurs rubriques à l’aide du mappage d’objet.
Paramètres du point de terminaison Apache Kafka
Vous pouvez spécifier les détails de connexion par le biais des paramètres du point de terminaison dans la AWS DMS console ou de l'--kafka-settings
option dans leCLI. Les conditions requises pour chaque paramètre sont les suivantes :
-
Broker
: spécifiez les emplacements d’un ou de plusieurs agents dans votre cluster Kafka sous la forme d’une liste séparée par des virgules de tous les éléments
. Par exemple :broker-hostname
:port
"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"
. Ce paramètre peut spécifier les emplacements d’un ou de tous les agents du cluster. Les agents de cluster communiquent tous pour gérer le partitionnement des enregistrements de données migrés vers la rubrique. -
Topic
: (facultatif) spécifiez le nom de rubrique avec une longueur maximale de 255 lettres et symboles. Vous pouvez utiliser le point (.), le trait de soulignement (_) et le moins (-). Les noms de rubrique avec un point (.) ou un trait de soulignement (_) peuvent entrer en collision dans des structures de données internes. Utilisez l'un ou l'autre de ces symboles, mais pas les deux dans le nom de la rubrique. Si vous ne spécifiez pas de nom de rubrique, AWS DMS"kafka-default-topic"
utilisez-le comme rubrique de migration.Note
Pour AWS DMS créer soit un sujet de migration que vous spécifiez, soit le sujet par défaut, défini dans le
auto.create.topics.enable = true
cadre de la configuration de votre cluster Kafka. Pour plus d’informations, consultez Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service. MessageFormat
: format de sortie pour les enregistrements créés sur le point de terminaison. Le format du message estJSON
(par défaut) ouJSON_UNFORMATTED
(une seule ligne sans onglet).-
MessageMaxBytes
: taille maximale en octets des enregistrements créés sur le point de terminaison. La valeur par défaut est 1 000 000.Note
Vous ne pouvez utiliser le AWS CLI/que SDK pour passer
MessageMaxBytes
à une valeur autre que celle par défaut. Par exemple, pour modifier votre point de terminaison Kafka existant et redéfinir la valeur deMessageMaxBytes
, utilisez la commande suivante.aws dms modify-endpoint --endpoint-arn
your-endpoint
--kafka-settings Broker="broker1-server
:broker1-port
,broker2-server
:broker2-port
,...", Topic=topic-name
,MessageMaxBytes=integer-of-max-message-size-in-bytes
IncludeTransactionDetails
: fournit des informations détaillées sur les transactions à partir de la base de données source. Ces informations comprennent un horodatage de validation, une position de journal et des valeurs pourtransaction_id
,previous_transaction_id
ettransaction_record_id
(le décalage d'enregistrement dans une transaction). L’argument par défaut estfalse
.IncludePartitionValue
: affiche la valeur de partition dans la sortie du message Kafka, sauf si le type de partition estschema-table-type
. L’argument par défaut estfalse
.PartitionIncludeSchemaTable
: préfixe les noms de schéma et de table aux valeurs de partition, lorsque le type de partition estprimary-key-type
. Cela augmente la distribution des données entre les partitions Kafka. Par exemple, supposons qu'un schémaSysBench
comporte des milliers de tables et que chaque table n'ait qu'une plage limitée pour une clé primaire. Dans ce cas, la même clé primaire est envoyée à partir de milliers de tables vers la même partition, ce qui provoque une limitation. L’argument par défaut estfalse
.IncludeTableAlterOperations
— Inclut toutes les opérations du langage de définition des données (DDL) qui modifient le tableau dans les données de contrôlerename-table
, telles quedrop-table
add-column
,,drop-column
, etrename-column
. L’argument par défaut estfalse
.IncludeControlDetails
: affiche les informations de contrôle détaillées pour la définition de table, la définition de colonne et les modifications de table et de colonne dans la sortie du message Kafka. L’argument par défaut estfalse
.-
IncludeNullAndEmpty
— Incluez NULL et videz des colonnes dans la cible. L’argument par défaut estfalse
. -
SecurityProtocol
— Définit une connexion sécurisée à un point de terminaison cible Kafka à l'aide de Transport Layer Security (TLS). Les options sontssl-authentication
,ssl-encryption
etsasl-ssl
. L’utilisation desasl-ssl
requiertSaslUsername
etSaslPassword
. -
SslEndpointIdentificationAlgorithm
— Définit la vérification du nom d'hôte pour le certificat. Ce paramètre est pris en charge dans les AWS DMS versions 3.5.1 et ultérieures. Les options disponibles sont les suivantes :NONE
: désactive la vérification du nom d'hôte du broker dans la connexion client.HTTPS
: Activez la vérification du nom d'hôte du broker dans la connexion client.
-
useLargeIntegerValue
— Utilisez jusqu'à 18 chiffres au lieu de convertir les entiers en doubles, disponible à partir de AWS DMS la version 3.5.4. La valeur par défaut est false.
Vous pouvez augmenter la vitesse du transfert dans les paramètres. Pour ce faire, AWS DMS prend en charge un chargement complet multithread sur un cluster cible Apache Kafka. AWS DMS prend en charge ce multithreading avec des paramètres de tâche qui incluent les éléments suivants :
-
MaxFullLoadSubTasks
— Utilisez cette option pour indiquer le nombre maximum de tables sources à charger en parallèle. AWS DMS charge chaque table dans la table cible Kafka correspondante à l'aide d'une sous-tâche dédiée. La valeur par défaut est 8 ; la valeur maximale 49. -
ParallelLoadThreads
— Utilisez cette option pour spécifier le nombre de threads AWS DMS utilisés pour charger chaque table dans sa table cible Kafka. La valeur maximale pour une cible Apache Kafka est 32. Vous pouvez demander une augmentation de cette limite maximale. -
ParallelLoadBufferSize
: utilisez cette option pour spécifier le nombre maximal d’enregistrements à stocker dans la mémoire tampon utilisée par les threads de chargement parallèles pour charger les données dans la cible Kafka. La valeur par défaut est 50. La valeur maximale est 1 000. Utilisez ce paramètre avecParallelLoadThreads
.ParallelLoadBufferSize
est valide uniquement dans le cas de plusieurs threads. -
ParallelLoadQueuesPerThread
: utilisez cette option pour spécifier le nombre de files d’attente auxquelles chaque thread simultané accède pour extraire les enregistrements de données des files d’attente et générer un chargement par lots pour la cible. La valeur par défaut est 1. La valeur maximale est 512.
Vous pouvez améliorer les performances de la capture des données de modification (CDC) pour les points de terminaison Kafka en ajustant les paramètres des tâches pour les threads parallèles et les opérations groupées. Pour ce faire, vous pouvez spécifier le nombre de threads simultanés, les files d'attente par thread et le nombre d'enregistrements à stocker dans un tampon à l'aide de la tâche ParallelApply*
. Supposons, par exemple, que vous souhaitiez effectuer un CDC chargement et appliquer 128 threads en parallèle. Vous souhaitez également accéder à 64 files d'attente par thread, avec 50 enregistrements stockés par tampon.
Pour CDC améliorer les performances, AWS DMS prend en charge les paramètres de tâche suivants :
-
ParallelApplyThreads
— Spécifie le nombre de threads simultanés AWS DMS utilisés lors d'un CDC chargement pour envoyer des enregistrements de données vers un point de terminaison cible Kafka. La valeur par défaut est zéro (0) et la valeur maximale est 32. -
ParallelApplyBufferSize
— Spécifie le nombre maximum d'enregistrements à stocker dans chaque file d'attente tampon pour les threads simultanés à envoyer vers un point de terminaison cible Kafka lors d'un CDC chargement. La valeur par défaut est 100 et la valeur maximale est 1 000. Utilisez cette option lorsqueParallelApplyThreads
spécifie plusieurs threads. -
ParallelApplyQueuesPerThread
— Spécifie le nombre de files d'attente auxquelles chaque thread accède pour extraire des enregistrements de données des files d'attente et générer un chargement par lots pour un point de terminaison Kafka pendant cette période. CDC La valeur par défaut est 1. La valeur maximale est 512.
Lorsque vous utilisez les paramètres de tâche ParallelApply*
, la valeur par défaut partition-key-type
est la valeur primary-key
de la table, pas schema-name.table-name
.
Connexion à Kafka à l'aide de Transport Layer Security () TLS
Un cluster Kafka accepte les connexions sécurisées à l'aide de Transport Layer Security (TLS). AvecDMS, vous pouvez utiliser l'une des trois options de protocole de sécurité suivantes pour sécuriser une connexion à un terminal Kafka.
- SSLchiffrement (
server-encryption
) -
Les clients valident l’identité du serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.
- SSLauthentification (
mutual-authentication
) -
Le serveur et le client valident mutuellement leur identité respective via leurs propres certificats. Une connexion chiffrée est alors établie entre le serveur et le client.
- SASL-SSL (
mutual-authentication
) -
La méthode Simple Authentication and Security Layer (SASL) remplace le certificat du client par un nom d'utilisateur et un mot de passe pour valider l'identité du client. Plus précisément, vous fournissez un nom d’utilisateur et un mot de passe enregistrés par le serveur afin que ce dernier puisse valider l’identité d’un client. Une connexion chiffrée est alors établie entre le serveur et le client.
Important
Apache Kafka et Amazon MSK acceptent les certificats résolus. Il s'agit d'une limitation connue de Kafka et d'Amazon MSK à résoudre. Pour plus d'informations, consultez la section Problèmes liés à Apache Kafka, KAFKA -3700
Si vous utilisez AmazonMSK, pensez à utiliser des listes de contrôle d'accès (ACLs) pour contourner cette limitation connue. Pour plus d'informations sur son utilisationACLs, consultez la ACLs section Apache Kafka du guide du développeur Amazon Managed Streaming for Apache Kafka.
Si vous utilisez un cluster Kafka autogéré, consultez Comment dated 21/Oct/18
Utilisation du SSL chiffrement avec Amazon MSK ou un cluster Kafka autogéré
Vous pouvez utiliser SSL le chiffrement pour sécuriser la connexion d'un terminal à Amazon MSK ou à un cluster Kafka autogéré. Lorsque vous utilisez la méthode d'authentification par SSL chiffrement, les clients valident l'identité d'un serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.
Pour utiliser SSL le chiffrement pour vous connecter à Amazon MSK
-
Définissez le paramètre de point de terminaison du protocole de sécurité (
SecurityProtocol
) à l’aide de l’optionssl-encryption
lorsque vous créez votre point de terminaison Kafka cible.L'JSONexemple suivant définit le protocole de sécurité en tant que SSL cryptage.
"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Pour utiliser SSL le chiffrement pour un cluster Kafka autogéré
-
Si vous utilisez une autorité de certification (CA) privée dans votre cluster Kafka sur site, téléchargez votre certificat CA privé et obtenez un nom de ressource Amazon (). ARN
-
Définissez le paramètre de point de terminaison du protocole de sécurité (
SecurityProtocol
) à l’aide de l’optionssl-encryption
lorsque vous créez votre point de terminaison Kafka cible. L'JSONexemple suivant définit le protocole de sécurité commessl-encryption
."KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
-
Si vous utilisez une autorité de certification privée, définissez
SslCaCertificateArn
le paramètre ARN que vous avez obtenu à la première étape ci-dessus.
Utilisation de l'SSLauthentification
Vous pouvez utiliser l'SSLauthentification pour sécuriser la connexion d'un terminal à Amazon MSK ou à un cluster Kafka autogéré.
Pour activer l'authentification et le chiffrement du client à l'aide de l'SSLauthentification pour se connecter à AmazonMSK, procédez comme suit :
-
Préparez une clé privée et un certificat public pour Kafka.
-
Téléchargez les certificats dans le gestionnaire de DMS certificats.
-
Créez un point de terminaison cible Kafka avec le certificat correspondant ARNs spécifié dans les paramètres du point de terminaison Kafka.
Pour préparer une clé privée et un certificat public pour Amazon MSK
-
Créez une EC2 instance et configurez un client pour qu'il utilise l'authentification, comme décrit dans les étapes 1 à 9 de la section Authentification du client du manuel Amazon Managed Streaming for Apache Kafka Developer Guide.
Une fois ces étapes terminées, vous disposez d'un certificat ARN (le certificat public ARN enregistré dansACM) et d'une clé privée contenus dans un
kafka.client.keystore.jks
fichier. -
Obtenez le certificat public et copiez-le dans le fichier
signed-certificate-from-acm.pem
à l’aide de la commande suivante :aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
Cette commande renvoie des informations semblables à celles de l’exemple suivant :
{"Certificate": "123", "CertificateChain": "456"}
Vous copiez ensuite votre équivalent de
"123"
dans le fichiersigned-certificate-from-acm.pem
. -
Pour obtenir la clé privée, importez la clé
msk-rsa
à partir dekafka.client.keystore.jks to keystore.p12
, comme illustré dans l’exemple suivant.keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
-
Utilisez la commande suivante pour exporter
keystore.p12
au format.pem
.Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
Le message Enter PEM Passphrase apparaît et identifie la clé appliquée pour chiffrer le certificat.
-
Supprimez les attributs de conteneur et les attributs de clé du fichier
.pem
pour vous assurer que la première ligne commence par la chaîne suivante.---BEGIN ENCRYPTED PRIVATE KEY---
Pour télécharger un certificat public et une clé privée dans le gestionnaire de DMS certificats et tester la connexion à Amazon MSK
-
Téléchargez vers le gestionnaire de DMS certificats à l'aide de la commande suivante.
aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://
path to signed cert
aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
-
Créez un point de terminaison MSK cible Amazon et testez la connexion pour vous assurer que TLS l'authentification fonctionne.
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Important
Vous pouvez utiliser l'SSLauthentification pour sécuriser une connexion à un cluster Kafka autogéré. Dans certains cas, vous devrez peut-être utiliser une autorité de certification (CA) privée dans votre cluster Kafka sur site. Si tel est le cas, téléchargez votre chaîne d'autorité de certification, votre certificat public et votre clé privée dans le gestionnaire de DMS certificats. Utilisez ensuite le nom de ressource Amazon correspondant (ARN) dans les paramètres de votre point de terminaison lorsque vous créez votre point de terminaison cible Kafka sur site.
Pour préparer une clé privée et un certificat signé pour un cluster Kafka autogéré
-
Générez une paire de clés comme indiqué dans l’exemple suivant.
keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass
your-keystore-password
-keypassyour-key-passphrase
-dname "CN=your-cn-name
" -aliasalias-of-key-pair
-storetype pkcs12 -keyalg RSA -
Générez une demande de signature de certificat (CSR).
keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass
your-key-store-password
-keypassyour-key-password
-
Utilisez l'autorité de certification de votre cluster pour signer leCSR. Si vous n’avez pas de CA, vous pouvez créer votre propre CA privée.
openssl req -new -x509 -keyout ca-key -out ca-cert -days
validate-days
-
Importez
ca-cert
dans le magasin de clés de confiance et le magasin de clés du serveur. Si vous n’avez pas de magasin de clés de confiance, utilisez la commande suivante pour le créer et y importerca-cert
.keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
-
Signez le certificat.
openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days
validate-days
-CAcreateserial -passin pass:ca-password
-
Importez le certificat signé dans le magasin de clés.
keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass
your-keystore-password
-keypassyour-key-password
-
Utilisez la commande suivante pour importer la clé
on-premise-rsa
dekafka.server.keystore.jks
danskeystore.p12
.keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass
your-truststore-password
\ -destkeypassyour-key-password
-
Utilisez la commande suivante pour exporter
keystore.p12
au format.pem
.Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
-
Téléchargez
encrypted-private-server-key.pem
signed-certificate.pem
, etca-cert
vers le gestionnaire DMS de certificats. -
Créez un point de terminaison en utilisant le renvoyéARNs.
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "
your-client-cert-arn
","SslClientKeyArn": "your-client-key-arn
","SslClientKeyPassword":"your-client-key-password
", "SslCaCertificateArn": "your-ca-certificate-arn
"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Utilisation SASL de SSL l'authentification pour se connecter à Amazon MSK
La méthode Simple Authentication and Security Layer (SASL) utilise un nom d'utilisateur et un mot de passe pour valider l'identité d'un client et établit une connexion cryptée entre le serveur et le client.
Pour l'utiliserSASL, vous devez d'abord créer un nom d'utilisateur et un mot de passe sécurisés lorsque vous configurez votre MSK cluster Amazon. Pour savoir comment configurer un nom d'utilisateur et un mot de passe sécurisés pour un MSK cluster Amazon, consultez la section Configuration SASL et SCRAM authentification pour un MSK cluster Amazon dans le guide du développeur Amazon Managed Streaming for Apache Kafka.
Ensuite, lorsque vous créez votre point de terminaison cible Kafka, définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol
) à l’aide de l’option sasl-ssl
. Vous définissez également les options SaslUsername
et SaslPassword
. Assurez-vous qu'ils correspondent au nom d'utilisateur et au mot de passe sécurisés que vous avez créés lors de la première configuration de votre MSK cluster Amazon, comme illustré dans l'JSONexemple suivant.
"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"
Amazon MSK cluster secure user name
", "SaslPassword":"Amazon MSK cluster secure password
" }
Note
-
Actuellement, ne AWS DMS prend en charge que le support public soutenu par une autorité de certification SASL -SSL. DMSne prend pas en chargeSASL, SSL à utiliser avec Kafka autogéré soutenu par une autorité de certification privée.
-
Pour SSL l'authentification SASL -, AWS DMS prend en charge le mécanisme SCRAM - SHA -512 par défaut. AWS DMS les versions 3.5.0 et supérieures prennent également en charge le mécanisme Plain. Pour prendre en charge le mécanisme Plain, définissez le
SaslMechanism
paramètre du type deKafkaSettings
API données surPLAIN
.
Utilisation d'une image précédente pour afficher les valeurs originales des CDC lignes pour Apache Kafka en tant que cible
Lorsque vous rédigez CDC des mises à jour sur une cible de diffusion de données telle que Kafka, vous pouvez afficher les valeurs d'origine d'une ligne de base de données source avant de les modifier par une mise à jour. Pour ce faire, AWS DMS remplit une image antérieure des événements de mise à jour en fonction des données fournies par le moteur de base de données source.
Différents moteurs de base de données source fournissent différentes quantités d'informations pour une image antérieure :
-
Oracle met uniquement à jour des colonnes si elles changent.
-
Postgre SQL fournit uniquement des données pour les colonnes qui font partie de la clé primaire (modifiées ou non). Si la réplication logique est utilisée et REPLICA IDENTITY FULL est définie pour la table source, vous pouvez obtenir des informations complètes avant et après sur la ligne écrite dans le WALs et disponibles ici.
-
My fournit SQL généralement des données pour toutes les colonnes (modifiées ou non).
Pour activer avant l'imagerie pour ajouter des valeurs d'origine de la base de données source à la sortie AWS DMS , utilisez le paramètre de tâche BeforeImageSettings
ou le paramètre add-before-image-columns
. Ce paramètre applique une règle de transformation de colonne.
BeforeImageSettings
ajoute un nouvel JSON attribut à chaque opération de mise à jour avec les valeurs collectées dans le système de base de données source, comme indiqué ci-dessous.
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Note
S'applique BeforeImageSettings
aux CDC tâches Full Load Plus (qui migrent les données existantes et répliquent les modifications en cours), ou CDC uniquement aux tâches (qui répliquent uniquement les modifications de données). N’appliquez pas les BeforeImageSettings
aux tâches à pleine charge uniquement.
Pour les options BeforeImageSettings
, les conditions suivantes s'appliquent :
-
Définissez l'option
EnableBeforeImage
surtrue
pour activer la génération d’image antérieure. L’argument par défaut estfalse
. -
Utilisez
FieldName
cette option pour attribuer un nom au nouvel JSON attribut. QuandEnableBeforeImage
esttrue
,FieldName
est obligatoire et ne peut pas être vide. -
L'option
ColumnFilter
spécifie une colonne à ajouter en utilisant la génération d’image antérieure. Pour ajouter uniquement des colonnes faisant partie des clés primaires de la table, utilisez la valeur par défaut,pk-only
. Pour ajouter uniquement des colonnes qui ne sont pas de LOB type, utiliseznon-lob
. Pour ajouter une colonne ayant une valeur d'image antérieure, utilisezall
."BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
Utilisation d'une règle de transformation d'image antérieure
Au lieu des paramètres de tâche, vous pouvez utiliser le paramètre add-before-image-columns
, qui applique une règle de transformation de colonne. Avec ce paramètre, vous pouvez l'activer avant de créer des images CDC sur des cibles de diffusion de données telles que Kafka.
En utilisant add-before-image-columns
dans une règle de transformation, vous pouvez exercer un contrôle plus précis sur les résultats de l'image antérieure. Les règles de transformation vous permettent d'utiliser un localisateur d'objets qui vous fournit un contrôle sur les tables sélectionnées pour la règle. En outre, vous pouvez enchaîner les règles de transformation, ce qui permet d'appliquer différentes règles à différentes tables. Vous pouvez ensuite manipuler les colonnes produites à l'aide d'autres règles.
Note
N'utilisez pas le paramètre add-before-image-columns
avec le paramètre de tâche BeforeImageSettings
dans la même tâche. N’utilisez pas les deux, pour une seule tâche.
Un type de règle transformation
avec le paramètre add-before-image-columns
d'une colonne doit fournir une section before-image-def
. Vous en trouverez un exemple ci-dessous.
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
La valeur de column-prefix
est ajoutée à un nom de colonne et la valeur par défaut de column-prefix
est BI_
. La valeur de column-suffix
est ajoutée au nom de la colonne et la valeur par défaut est vide. Ne définissez pas les deux column-prefix
et column-suffix
sur des chaînes vides.
Choisissez une valeur pour column-filter
. Pour ajouter uniquement les colonnes qui font partie des clés primaires de la table, choisissez pk-only
. Choisissez non-lob
de n'ajouter que des colonnes qui ne sont pas de LOB type. Ou choisissez all
d'ajouter une colonne qui a une valeur d’image antérieure.
Exemple de règle de transformation d'image antérieure
La règle de transformation de l'exemple suivant ajoute une nouvelle colonne appelée BI_emp_no
dans la cible. Ainsi, une instruction comme UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
remplit le champ BI_emp_no
avec 1. Lorsque vous rédigez CDC des mises à jour pour les cibles Amazon S3, la BI_emp_no
colonne permet de savoir quelle ligne d'origine a été mise à jour.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
Pour plus d'informations sur l'utilisation de l'action de règle add-before-image-columns
, consultez Règles et actions de transformation.
Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service
Les limitations suivantes s'appliquent lorsque vous utilisez apache Kafka comme cible :
-
AWS DMS Les points de terminaison cibles de Kafka ne prennent pas en charge le contrôle IAM d'accès pour Amazon Managed Streaming for Apache Kafka (MSKAmazon).
-
LOBLe mode complet n'est pas pris en charge.
-
Spécifiez un fichier de configuration Kafka pour votre cluster avec des propriétés permettant AWS DMS de créer automatiquement de nouveaux sujets. Incluez le paramètre,
auto.create.topics.enable = true
. Si vous utilisez AmazonMSK, vous pouvez spécifier la configuration par défaut lorsque vous créez votre cluster Kafka, puis modifier leauto.create.topics.enable
paramètre sur.true
Pour plus d'informations sur les paramètres de configuration par défaut, consultez la section La MSK configuration par défaut d'Amazon Managed Streaming for Apache Kafka Developer Guide. Si vous devez modifier un cluster Kafka existant créé à l'aide d'AmazonMSK, exécutez la AWS CLI commandeaws kafka create-configuration
pour mettre à jour votre configuration Kafka, comme dans l'exemple suivant :14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }
Ici,
//~/kafka_configuration
est le fichier de configuration que vous avez créé avec les paramètres de propriété requis.Si vous utilisez votre propre instance Kafka installée sur AmazonEC2, modifiez la configuration du cluster Kafka avec le
auto.create.topics.enable = true
paramètre permettant de AWS DMS créer automatiquement de nouveaux sujets, en utilisant les options fournies avec votre instance. -
AWS DMS publie chaque mise à jour d'un seul enregistrement de la base de données source sous la forme d'un enregistrement de données (message) dans un sujet Kafka donné, quelles que soient les transactions.
-
AWS DMS prend en charge les deux formes suivantes pour les clés de partition :
-
SchemaName.TableName
: une combinaison du nom du schéma et du nom de la table. -
${AttributeName}
: valeur de l'un des champs de la JSON table ou clé primaire de celle-ci dans la base de données source.
-
-
BatchApply
n’est pas pris en charge pour un point de terminaison Kafka. L’utilisation de l’application par lots (par exemple, le paramètre de tâche de métadonnées cibleBatchApplyEnabled
) pour une cible Kafka peut entraîner une perte de données. -
AWS DMS ne prend pas en charge la migration de valeurs de type de
BigInt
données comportant plus de 16 chiffres. Pour contourner cette limitation, vous pouvez utiliser la règle de transformation suivante pour convertir la colonneBigInt
en chaîne. Pour plus d’informations sur les règles de transformation, consultez Règles et actions de transformation.{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
Utilisation du mappage d’objet pour migrer les données vers une rubrique Kafka
AWS DMS utilise des règles de mappage de tables pour mapper les données de la source au sujet Kafka cible. Pour mapper des données à une rubrique cible, vous utilisez un type de règle de mappage de table appelé « mappage d’objet ». Vous utilisez le mappage d'objet pour définir la façon dont les enregistrements de données de la source sont mappés sur les enregistrements de données publiés dans une rubrique Kafka.
Les rubriques Kafka ne disposent pas d'une structure prédéfinie autre que le fait d'avoir une clé de partition.
Note
Vous n’avez pas besoin d’utiliser le mappage d’objet. Vous pouvez utiliser un mappage de table standard pour différentes transformations. Cependant, le type de clé de partition suivra les comportements par défaut suivants :
-
La clé primaire est utilisée comme clé de partition pour le chargement complet.
-
Si aucun paramètre de tâche à application parallèle n'est utilisé,
schema.table
il est utilisé comme clé de partition pour. CDC -
Si des paramètres de tâche à application parallèle sont utilisés, la clé primaire est utilisée comme clé de partition pour. CDC
Pour créer une règle de mappage d'objet, spécifiez rule-type
comme object-mapping
. Cette règle spécifie le type de mappage d'objet que vous souhaitez utiliser.
La structure de la règle est la suivante.
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS prend actuellement en charge map-record-to-record
et map-record-to-document
en tant que seules valeurs valides pour le rule-action
paramètre. Ces paramètres affectent les valeurs qui ne sont pas exclues de la liste d’attributs exclude-columns
. Les map-record-to-document
valeurs map-record-to-record
et indiquent comment ces AWS DMS enregistrements sont gérés par défaut. Ces valeurs n'affectent en aucune façon les mappages d'attributs.
Utilisez map-record-to-record
lors d'une migration d'une base de données relationnelle vers une rubrique Kafka. Ce type de règle utilise la valeur taskResourceId.schemaName.tableName
de la base de données relationnelle comme clé de partition dans la rubrique Kafka, et crée un attribut pour chaque colonne dans la base de données source.
Lorsque vous utilisez map-record-to-record
, notez ce qui suit :
Ce paramètre n’affecte que les colonnes exclues par la liste
exclude-columns
.Pour chacune de ces colonnes, AWS DMS crée un attribut correspondant dans le sujet cible.
AWS DMS crée cet attribut correspondant, que la colonne source soit utilisée ou non dans un mappage d'attributs.
Une manière de comprendre map-record-to-record
est de le voir en action. Dans cet exemple, imaginons que vous commencez avec une ligne de table d'une base de données relationnelle, présentant la structure et les données suivantes :
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
02/29/1988 |
Pour migrer ces informations à partir d'un schéma nommé Test
vers une rubrique Kafka, vous créez des règles pour mapper les données sur la rubrique cible. La règle suivante illustre ce mappage.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
Compte tenu d’une rubrique Kafka et d’une clé de partition (dans ce cas, taskResourceId.schemaName.tableName
), l’exemple ci-dessous illustre le format d’enregistrement résultant à l’aide de nos exemples de données dans la rubrique cible Kafka :
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
Rubriques
Restructuration de données avec le mappage d'attribut
Vous pouvez restructurer les données lors de leur migration vers une rubrique Kafka à l'aide d'un mappage d'attribut. Par exemple, vous pourriez vouloir regrouper plusieurs champs de la source en un seul champ dans la cible. Le mappage d'attribut suivant illustre comment restructurer les données.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
Pour définir une valeur constante pour partition-key
, spécifiez une valeur partition-key
. Par exemple, vous pouvez le faire pour forcer le stockage de toutes les données dans une seule partition. Le mappage suivant illustre cette approche.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Note
La valeur partition-key
d'un enregistrement de contrôle correspondant à une table spécifique est TaskId.SchemaName.TableName
. La valeur partition-key
d'un enregistrement de contrôle correspondant à une tâche spécifique est le TaskId
de cet enregistrement. La spécification d'une valeur partition-key
dans le mappage d'objet n'a aucun impact sur la partition-key
d'un enregistrement de contrôle.
Réplication à plusieurs rubriques à l’aide du mappage d’objet
Par défaut, les AWS DMS tâches migrent toutes les données sources vers l'une des rubriques Kafka suivantes :
Comme indiqué dans le champ Rubrique du point de terminaison AWS DMS cible.
Comme indiqué par
kafka-default-topic
si le champ Rubrique du point de terminaison cible n’est pas renseigné et que le paramètreauto.create.topics.enable
Kafka est défini surtrue
.
Avec les versions 3.4.6 et supérieures AWS DMS du moteur, vous pouvez utiliser l'kafka-target-topic
attribut pour associer chaque table source migrée à une rubrique distincte. Par exemple, les règles de mappage d’objet suivantes migrent les tables sources Customer
et Address
vers les rubriques Kafka customer_topic
et address_topic
, respectivement. Dans le même temps, AWS DMS migre toutes les autres tables sources, y compris la Bills
table du Test
schéma, vers le sujet spécifié dans le point de terminaison cible.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }
En utilisant la réplication à plusieurs rubriques Kafka, vous pouvez regrouper et migrer les tables sources afin de séparer les rubriques Kafka à l’aide d’une seule tâche de réplication.
Format de message pour Apache Kafka
Le JSON résultat est simplement une liste de paires clé-valeur.
- RecordType
-
Les enregistrements peuvent être de type Données ou Contrôle. Les enregistrements de données représentent les lignes réelles de la source. Les enregistrements de contrôle sont destinés à des événements importants dans le flux, par exemple un redémarrage de la tâche.
- Opération
-
Pour les enregistrements de données, l'opération peut être
load
,insert
,update
oudelete
.Pour les enregistrements de contrôle, l’opération peut être
create-table
,rename-table
,drop-table
,change-columns
,add-column
,drop-column
,rename-column
oucolumn-type-change
. - SchemaName
-
Schéma source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.
- TableName
-
Table source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.
- Horodatage
-
L'horodatage du moment où le JSON message a été créé. Le champ est formaté au format ISO 8601.
L'exemple de JSON message suivant illustre un message de type de données avec toutes les métadonnées supplémentaires.
{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }
L'exemple de JSON message suivant illustre un message de type contrôle.
{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }