Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwendung von Apache Kafka als Ziel für AWS Database Migration Service
Sie können es verwenden AWS DMS , um Daten zu einem Apache Kafka-Cluster zu migrieren. Apache Kafka ist eine verteilte Streaming-Plattform. Mit Apache Kafka können Sie Streaming-Daten in Echtzeit erfassen und verarbeiten.
AWS bietet auch Amazon Managed Streaming for Apache Kafka (AmazonMSK) zur Verwendung als AWS DMS Ziel an. Amazon MSK ist ein vollständig verwalteter Apache Kafka-Streaming-Service, der die Implementierung und Verwaltung von Apache Kafka-Instances vereinfacht. Es funktioniert mit Open-Source-Versionen von Apache Kafka, und Sie greifen genau wie jede Apache Kafka-Instance auf MSK Amazon-Instances als AWS DMS Ziele zu. Weitere Informationen finden Sie unter Was ist AmazonMSK? im Amazon Managed Streaming for Apache Kafka Developer Guide.
Ein Kafka-Cluster speichert Streams von Datensätzen in Kategorien, die als Themen bezeichnet und in Partitionen unterteilt werden. Partitionen sind eindeutig identifizierte Sequenzen von Datensätzen (Nachrichten) in einem Thema. Partitionen können über mehrere Broker in einem Cluster verteilt werden, um die parallele Verarbeitung der Datensätze eines Themas zu ermöglichen. Weitere Informationen zu Themen und Partitionen und ihrer Verteilung in Apache Kafka finden Sie unter Themen und Protokolle
Ihr Kafka-Cluster kann entweder eine MSK Amazon-Instance, ein Cluster, der auf einer EC2 Amazon-Instance läuft, oder ein lokaler Cluster sein. Eine MSK Amazon-Instance oder ein Cluster auf einer EC2 Amazon-Instance kann sich in derselben VPC oder einer anderen Instance befinden. Wenn es sich um einen On-Premises-Cluster handelt, können Sie Ihren eigenen On-Premises-Namensserver für Ihre Replikations-Instance verwenden, um den Host-Namen des Clusters aufzulösen. Informationen zum Einrichten eines Namensservers für Ihre Replikations-Instance finden Sie unter Verwenden Ihres eigenen Vor-Ort-Nameservers. Weitere Informationen zum Einrichten eines Netzwerks finden Sie unter Einrichten eines Netzwerks für eine Replikations-Instance.
Wenn Sie einen MSK Amazon-Cluster verwenden, stellen Sie sicher, dass dessen Sicherheitsgruppe den Zugriff von Ihrer Replikationsinstanz aus ermöglicht. Informationen zum Ändern der Sicherheitsgruppe für einen MSK Amazon-Cluster finden Sie unter Ändern der Sicherheitsgruppe eines MSK Amazon-Clusters.
AWS Database Migration Service veröffentlicht Datensätze zu einem Kafka-Thema mitJSON. AWS DMS Serialisiert während der Konvertierung jeden Datensatz aus der Quelldatenbank in ein Attribut-Wert-Paar im Format. JSON
Zum Migrieren Ihrer Daten von einer unterstützten Datenquelle zu einem Kafka-Ziel-Cluster verwenden Sie die Objektzuweisung. Mit der Objektzuweisung bestimmen Sie, wie die Datensätze im Zielthema strukturiert werden sollen. Außerdem definieren Sie einen Partitionsschlüssel für jede Tabelle, die Apache Kafka zum Gruppieren der Daten in seine Partitionen verwendet.
AWS DMS Unterstützt derzeit ein einzelnes Thema pro Aufgabe. Bei einer einzelnen Aufgabe mit mehreren Tabellen beziehen sich alle Nachrichten auf ein einzelnes Thema. Jede Nachricht enthält einen Metadaten-Abschnitt, der das Zielschema und die Zieltabelle identifiziert. AWS DMS Versionen 3.4.6 und höher unterstützen die themenübergreifende Replikation mithilfe von Objektzuordnung. Weitere Informationen finden Sie unter Replikation für mehrere Themen mithilfe der Objektzuweisung.
Apache Kafka-Endpunkteinstellungen
Sie können Verbindungsdetails über die Endpunkteinstellungen in der AWS DMS Konsole oder über die --kafka-settings
Option in der angeben. CLI Es folgen die Anforderungen für jede Einstellung:
-
Broker
– Geben Sie die Speicherorte eines oder mehrerer Broker in Ihrem Kafka-Cluster jeweils in Form einer durch Kommata getrennten Liste mit
an. Ein Beispiel istbroker-hostname
:port
"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"
. Mit dieser Einstellung können Sie die Speicherorte einiger oder aller Broker im Cluster angeben. Die Cluster-Broker kommunizieren miteinander, um die Partitionierung von Datensätzen zu verarbeiten, die zu dem Thema migriert wurden. -
Topic
– (Optional) Geben Sie den Themennamen mit einer maximalen Länge von 255 Buchstaben und Symbolen an. Sie können Punkt (.), Unterstrich (_) und Minuszeichen (-) verwenden. Themennamen mit einem Punkt (.) oder einem Unterstrich (_) können in internen Datenstrukturen kollidieren. Verwenden Sie entweder eines, aber nicht beide dieser Symbole im Namen des Themas. Wenn Sie keinen Themennamen angeben, AWS DMS verwendet"kafka-default-topic"
als Migrationsthema.Anmerkung
Wenn Sie entweder ein von Ihnen angegebenes Migrationsthema oder das Standardthema AWS DMS erstellen möchten, das
auto.create.topics.enable = true
als Teil Ihrer Kafka-Cluster-Konfiguration festgelegt wurde. Weitere Informationen finden Sie unter Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service MessageFormat
– Das Ausgabeformat für die Datensätze, die auf dem Endpunkt erstellt wurden. Das Nachrichtenformat istJSON
(Standard) oderJSON_UNFORMATTED
(eine einzelne Zeile ohne Tabulator).-
MessageMaxBytes
– Die maximale Größe in Byte für Datensätze, die auf dem Endpunkt erstellt wurden. Der Standardwert ist 1.000.000.Anmerkung
Sie können den Wert AWS CLI/nur verwenden, um zu einem SDK Wert
MessageMaxBytes
zu wechseln, der nicht dem Standard entspricht. Verwenden Sie beispielsweise den folgenden Befehl, um Ihren vorhandenen Kafka-Endpunkt undMessageMaxBytes
zu ändern.aws dms modify-endpoint --endpoint-arn
your-endpoint
--kafka-settings Broker="broker1-server
:broker1-port
,broker2-server
:broker2-port
,...", Topic=topic-name
,MessageMaxBytes=integer-of-max-message-size-in-bytes
IncludeTransactionDetails
– Stellt detaillierte Transaktionsinformationen aus der Quelldatenbank bereit. Diese Informationen beinhalten einen Durchführungszeitstempel, eine Protokollposition sowie Werte fürtransaction_id
,previous_transaction_id
undtransaction_record_id
(den Datensatzoffset innerhalb einer Transaktion). Der Standardwert istfalse
.IncludePartitionValue
– Zeigt den Partitionswert innerhalb der Kafka-Nachrichtenausgabe an, es sei denn, der Partitionstyp istschema-table-type
. Der Standardwert istfalse
.PartitionIncludeSchemaTable
– Fügt Schema- und Tabellennamen zu Partitionswerten als Präfix hinzu, wenn der Partitionstypprimary-key-type
ist. Dadurch wird die Datenverteilung zwischen Kafka-Partitionen erhöht. Angenommen, einSysBench
-Schema hat Tausende von Tabellen und jede davon hat nur einen begrenzten Bereich für einen Primärschlüssel. In diesem Fall wird derselbe Primärschlüssel von Tausenden von Tabellen an dieselbe Partition gesendet, was zu einer Drosselung führt. Der Standardwert istfalse
.IncludeTableAlterOperations
— Schließt alle Operationen der Datendefinitionssprache (DDL) ein, die die Tabelle in den Steuerdaten ändern, z. B.rename-table
drop-table
,,add-column
drop-column
, undrename-column
. Der Standardwert istfalse
.IncludeControlDetails
– Zeigt detaillierte Steuerungsinformationen für Tabellendefinition, Spaltendefinition und Tabellen- und Spaltenänderungen in der Kafka-Nachrichtenausgabe an. Der Standardwert istfalse
.-
IncludeNullAndEmpty
— Schließt Spalten in das Ziel ein NULL und leert sie. Der Standardwert istfalse
. -
SecurityProtocol
— Stellt mithilfe von Transport Layer Security (TLS) eine sichere Verbindung zu einem Kafka-Zielendpunkt her. Optionen:ssl-authentication
,ssl-encryption
undsasl-ssl
. Für die Verwendung vonsasl-ssl
sindSaslUsername
undSaslPassword
erforderlich. -
SslEndpointIdentificationAlgorithm
— Legt die Überprüfung des Hostnamens für das Zertifikat fest. Diese Einstellung wird in AWS DMS Version 3.5.1 und höher unterstützt. Es gibt die folgenden Optionen:NONE
: Deaktiviert die Überprüfung des Hostnamens des Brokers in der Client-Verbindung.HTTPS
: Aktiviert die Überprüfung des Hostnamens des Brokers in der Client-Verbindung.
-
useLargeIntegerValue
— Verwenden Sie bis zu 18-stellige Ganzzahlen, anstatt Ganzzahlen als Doppelzahl umzuwandeln. Dies ist ab AWS DMS Version 3.5.4 verfügbar. Der Standardwert lautet „false“.
Sie können Einstellungen verwenden, um die Übertragungsgeschwindigkeit zu erhöhen. Dazu unterstützt AWS DMS Multi-Thread-Volllastvorgänge in einen Ziel-Cluster in Apache Kafka. AWS DMS unterstützt Multi-Threading u. a. mithilfe der folgenden Aufgabeneinstellungen:
-
MaxFullLoadSubTasks
— Verwenden Sie diese Option, um die maximale Anzahl von Quelltabellen anzugeben, die parallel geladen werden sollen. AWS DMS lädt jede Tabelle mithilfe einer speziellen Unteraufgabe in die entsprechende Kafka-Zieltabelle. Der Standardwert beträgt 8; der Maximalwert beträgt 49. -
ParallelLoadThreads
— Verwenden Sie diese Option, um die Anzahl der Threads anzugeben, die AWS DMS verwendet werden, um jede Tabelle in ihre Kafka-Zieltabelle zu laden. Der maximale Wert für ein Apache Kafka-Ziel ist 32. Sie können eine Erhöhung dieses Höchstwerts anfordern. -
ParallelLoadBufferSize
– Verwenden Sie diese Option, um die maximale Anzahl der Datensätze anzugeben, die in dem Puffer gespeichert werden sollen, den die parallelen Lade-Threads zum Laden von Daten in das Kafka-Ziel verwenden. Der Standardwert lautet 50. Die maximale Wert ist 1.000. Verwenden Sie diese Einstellung mitParallelLoadThreads
;ParallelLoadBufferSize
ist nur gültig, wenn es mehr als einen Thread gibt. -
ParallelLoadQueuesPerThread
– Verwenden Sie diese Option, um die Anzahl der Warteschlangen anzugeben, auf die jeder gleichzeitige Thread zugreift, um Datensätze aus Warteschlangen zu entfernen und eine Stapellast für das Ziel zu generieren. Der Standardwert ist 1. Der Höchstwert ist 512.
Sie können die Leistung von Change Data Capture (CDC) für Kafka-Endpunkte verbessern, indem Sie die Aufgabeneinstellungen für parallel Threads und Massenoperationen optimieren. Dazu können Sie die Anzahl der gleichzeitigen Threads, der Warteschlangen pro Thread und die Anzahl der Datensätze angeben, die in einem Puffer unter Verwendung von ParallelApply*
-Aufgabeneinstellungen gespeichert werden sollen. Nehmen wir zum Beispiel an, Sie möchten einen CDC Ladevorgang ausführen und 128 Threads parallel anwenden. Außerdem möchten Sie auf 64 Warteschlangen pro Thread zugreifen, wobei 50 Datensätze pro Puffer gespeichert sind.
AWS DMS Unterstützt zur CDC Steigerung der Leistung die folgenden Aufgabeneinstellungen:
-
ParallelApplyThreads
— Gibt die Anzahl gleichzeitiger Threads an, die während eines CDC Ladevorgangs AWS DMS verwendet werden, um Datensätze an einen Kafka-Zielendpunkt zu übertragen. Der Standardwert ist Null (0) und der maximale Wert ist 32. -
ParallelApplyBufferSize
— Gibt die maximale Anzahl von Datensätzen an, die in jeder Pufferwarteschlange gespeichert werden sollen, damit gleichzeitige Threads während eines Ladevorgangs an einen Kafka-Zielendpunkt weitergeleitet werden. CDC Der Standardwert ist 100 und der Höchstwert 1 000. Verwenden Sie diese Option, wennParallelApplyThreads
mehrere Threads angibt. -
ParallelApplyQueuesPerThread
— Gibt die Anzahl der Warteschlangen an, auf die jeder Thread zugreift, um Datensätze aus den Warteschlangen zu entfernen und währenddessen ein Batch-Load für einen Kafka-Endpunkt zu generieren. CDC Der Standardwert ist 1. Der Höchstwert ist 512.
Wenn Sie ParallelApply*
-Aufgabeneinstellungen verwenden, ist der primary-key
der Tabelle der partition-key-type
-Standardwert, nicht schema-name.table-name
.
Mit Transport Layer Security () eine Verbindung zu Kafka herstellen TLS
Ein Kafka-Cluster akzeptiert sichere Verbindungen mithilfe von Transport Layer Security ()TLS. Mit DMS können Sie eine der folgenden drei Sicherheitsprotokolloptionen verwenden, um eine Kafka-Endpunktverbindung zu sichern.
- SSLVerschlüsselung ()
server-encryption
-
Clients validieren die Serveridentität anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.
- SSLAuthentifizierung (
mutual-authentication
) -
Server und Client validieren die Identität untereinander durch ihre eigenen Zertifikate. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.
- SASL-SSL (
mutual-authentication
) -
Die Methode Simple Authentication and Security Layer (SASL) ersetzt das Zertifikat des Clients durch einen Benutzernamen und ein Passwort, um die Identität eines Clients zu überprüfen. Sie geben einen Benutzernamen und ein Passwort an, die der Server registriert hat, damit der Server die Identität eines Clients überprüfen kann. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.
Wichtig
Apache Kafka und Amazon MSK akzeptieren aufgelöste Zertifikate. Dies ist eine bekannte Einschränkung von Kafka und AmazonMSK, die behoben werden muss. Weitere Informationen finden Sie unter Probleme mit Apache Kafka, KAFKA -3700
Wenn Sie Amazon verwendenMSK, sollten Sie die Verwendung von Zugriffskontrolllisten (ACLs) als Workaround für diese bekannte Einschränkung in Betracht ziehen. Weitere Informationen zur Verwendung ACLs finden Sie im ACLs Abschnitt Apache Kafka im Amazon Managed Streaming for Apache Kafka Developer Guide.
Wenn Sie einen selbstverwalteten Kafka-Cluster verwenden, finden Sie Informationen zur Konfiguration des Clusters im Kommentar vom 21. Oktober 18
SSLVerschlüsselung mit Amazon MSK oder einem selbstverwalteten Kafka-Cluster verwenden
Sie können SSL Verschlüsselung verwenden, um eine Endpunktverbindung zu Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern. Wenn Sie die SSL Verschlüsselungsauthentifizierungsmethode verwenden, validieren Clients die Identität eines Servers anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.
So verwenden Sie SSL Verschlüsselung, um eine Verbindung zu Amazon herzustellen MSK
-
Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (
SecurityProtocol
) mithilfe der Optionssl-encryption
fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen.Im folgenden JSON Beispiel wird das Sicherheitsprotokoll als SSL Verschlüsselung festgelegt.
"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Um SSL Verschlüsselung für einen selbstverwalteten Kafka-Cluster zu verwenden
-
Wenn Sie eine private Zertifizierungsstelle (CA) in Ihrem lokalen Kafka-Cluster verwenden, laden Sie Ihr privates CA-Zertifikat hoch und erhalten Sie einen Amazon-Ressourcennamen (). ARN
-
Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (
SecurityProtocol
) mithilfe der Optionssl-encryption
fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen. Im folgenden JSON Beispiel wird das Sicherheitsprotokoll als festgelegt.ssl-encryption
"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
-
Wenn Sie eine private Zertifizierungsstelle verwenden, geben ARN Sie die
SslCaCertificateArn
im ersten Schritt oben angegebene Zertifizierungsstelle ein.
SSLAuthentifizierung verwenden
Sie können die SSL Authentifizierung verwenden, um eine Endpunktverbindung zu Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern.
Gehen Sie wie folgt vor, um die SSL Client-Authentifizierung und Verschlüsselung mithilfe der Authentifizierung für die Verbindung mit Amazon MSK zu aktivieren:
-
Bereiten Sie einen privaten Schlüssel und ein öffentliches Zertifikat für Kafka vor.
-
Laden Sie Zertifikate in den DMS Zertifikatsmanager hoch.
-
Erstellen Sie einen Kafka-Zielendpunkt mit dem entsprechenden Zertifikat, das in den Kafka-Endpunkteinstellungen ARNs angegeben ist.
Um einen privaten Schlüssel und ein öffentliches Zertifikat für Amazon vorzubereiten MSK
-
Erstellen Sie eine EC2 Instance und richten Sie einen Client für die Verwendung der Authentifizierung ein, wie in den Schritten 1 bis 9 im Abschnitt Client-Authentifizierung im Amazon Managed Streaming for Apache Kafka Developer Guide beschrieben.
Nachdem Sie diese Schritte abgeschlossen haben, verfügen Sie über ein Zertifikat ARN (das öffentliche Zertifikat, das in ARN gespeichert istACM) und einen privaten Schlüssel, der in einer
kafka.client.keystore.jks
Datei enthalten ist. -
Rufen Sie das öffentliche Zertifikat ab und kopieren Sie das Zertifikat mit dem folgenden Befehl in die Datei
signed-certificate-from-acm.pem
:aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
Dieser Befehl gibt ähnliche Informationen wie im folgenden Beispiel zurück:
{"Certificate": "123", "CertificateChain": "456"}
Anschließend kopieren Sie Ihr Äquivalent von
"123"
in die Dateisigned-certificate-from-acm.pem
. -
Rufen Sie den privaten Schlüssel ab, indem Sie den Schlüssel
msk-rsa
auskafka.client.keystore.jks to keystore.p12
importieren, wie im folgenden Beispiel gezeigt.keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
-
Verwenden Sie den folgenden Befehl, um
keystore.p12
in das Format.pem
zu exportieren.Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
Die Meldung „PEMPassphrase eingeben“ wird angezeigt und gibt den Schlüssel an, der zur Verschlüsselung des Zertifikats verwendet wird.
-
Entfernen Sie Taschen- und Schlüsselattribute aus der
.pem
-Datei, um sicherzustellen, dass die erste Zeile mit der folgenden Zeichenfolge beginnt.---BEGIN ENCRYPTED PRIVATE KEY---
Um ein öffentliches Zertifikat und einen privaten Schlüssel in den DMS Zertifikatsmanager hochzuladen und die Verbindung zu Amazon zu testen MSK
-
Laden Sie es mit dem folgenden Befehl in den DMS Zertifikatsmanager hoch.
aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://
path to signed cert
aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
-
Erstellen Sie einen MSK Amazon-Zielendpunkt und testen Sie die Verbindung, um sicherzustellen, dass die TLS Authentifizierung funktioniert.
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Wichtig
Sie können die SSL Authentifizierung verwenden, um eine Verbindung zu einem selbstverwalteten Kafka-Cluster zu sichern. In Einzelfällen können Sie eine private Zertifizierungsstelle (CA) in Ihrem On-Premises-Kafka-Cluster verwenden. Wenn ja, laden Sie Ihre CA-Kette, Ihr öffentliches Zertifikat und Ihren privaten Schlüssel in den DMS Zertifikatsmanager hoch. Verwenden Sie dann den entsprechenden Amazon-Ressourcennamen (ARN) in Ihren Endpunkteinstellungen, wenn Sie Ihren lokalen Kafka-Zielendpunkt erstellen.
So bereiten Sie einen privaten Schlüssel und ein signiertes Zertifikat für einen selbstverwalteten Kafka-Cluster vor
-
Erzeugen Sie ein Schlüsselpaar wie im folgenden Beispiel dargestellt.
keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass
your-keystore-password
-keypassyour-key-passphrase
-dname "CN=your-cn-name
" -aliasalias-of-key-pair
-storetype pkcs12 -keyalg RSA -
Generieren Sie eine Anforderung zum Signieren eines Zertifikats ()CSR.
keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass
your-key-store-password
-keypassyour-key-password
-
Verwenden Sie die CA in Ihrem Cluster-Truststore, um das CSR zu signieren. Wenn Sie keine CA haben, können Sie Ihre eigene private CA erstellen.
openssl req -new -x509 -keyout ca-key -out ca-cert -days
validate-days
-
Importieren Sie
ca-cert
in den Truststore und Keystore des Servers. Wenn Sie keinen Truststore haben, führen Sie folgenden Befehl aus, um den Truststore zu erstellen undca-cert
in diesen zu importieren.keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
-
Signieren Sie das Zertifikat.
openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days
validate-days
-CAcreateserial -passin pass:ca-password
-
Importieren Sie das signierte Zertifikat in den Keystore.
keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass
your-keystore-password
-keypassyour-key-password
-
Verwenden Sie den folgenden Befehl, um den Schlüssel
on-premise-rsa
vonkafka.server.keystore.jks
inkeystore.p12
zu importieren.keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass
your-truststore-password
\ -destkeypassyour-key-password
-
Verwenden Sie den folgenden Befehl, um
keystore.p12
in das Format.pem
zu exportieren.Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
-
Laden Sie
encrypted-private-server-key.pem
signed-certificate.pem
, und inca-cert
den DMS Zertifikatsmanager hoch. -
Erstellen Sie einen Endpunkt mithilfe des zurückgegebenenARNs.
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "
your-client-cert-arn
","SslClientKeyArn": "your-client-key-arn
","SslClientKeyPassword":"your-client-key-password
", "SslCaCertificateArn": "your-ca-certificate-arn
"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Verwenden von SASL — SSL Authentifizierung für die Verbindung zu Amazon MSK
Die Methode Simple Authentication and Security Layer (SASL) verwendet einen Benutzernamen und ein Passwort, um die Identität eines Kunden zu überprüfen, und stellt eine verschlüsselte Verbindung zwischen Server und Client her.
Für die Verwendung SASL erstellen Sie zunächst einen sicheren Benutzernamen und ein sicheres Passwort, wenn Sie Ihren MSK Amazon-Cluster einrichten. Eine Beschreibung, wie Sie einen sicheren Benutzernamen und ein sicheres Passwort für einen MSK Amazon-Cluster einrichten, finden Sie unter Setting SASL SCRAM up/authentication for a Amazon MSK Cluster im Amazon Managed Streaming for Apache Kafka Developer Guide.
Wenn Sie dann Ihren Kafka-Zielendpunkt erstellen, legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol
) mithilfe der Option sasl-ssl
fest. Legen Sie auch die Optionen SaslUsername
und SaslPassword
fest. Stellen Sie sicher, dass diese mit dem sicheren Benutzernamen und dem Passwort übereinstimmen, die Sie bei der ersten Einrichtung Ihres MSK Amazon-Clusters erstellt haben, wie im folgenden JSON Beispiel gezeigt.
"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"
Amazon MSK cluster secure user name
", "SaslPassword":"Amazon MSK cluster secure password
" }
Anmerkung
-
AWS DMS Unterstützt derzeit nur öffentliche, von CA gestützte SASL -SSL. DMSunterstützt nicht SASL — SSL zur Verwendung mit selbstverwaltetem Kafka, das von einer privaten CA unterstützt wird.
-
Für die SASL SSL Authentifizierung wird AWS DMS standardmäßig der Mechanismus SCRAM SHA -512 unterstützt. AWS DMS Versionen 3.5.0 und höher unterstützen auch den Plain-Mechanismus. Um den Plain-Mechanismus zu unterstützen, setzen Sie den
SaslMechanism
Parameter desKafkaSettings
API Datentyps aufPLAIN
.
Verwenden Sie ein Vorher-Bild, um die ursprünglichen Werte von CDC Zeilen für Apache Kafka als Ziel anzuzeigen
Wenn Sie CDC Aktualisierungen in ein Datenstreaming-Ziel wie Kafka schreiben, können Sie die ursprünglichen Werte einer Quelldatenbankzeile anzeigen, bevor sie durch ein Update geändert werden. Um dies zu ermöglichen, AWS DMS füllt es ein Vorher-Bild von Aktualisierungsereignissen auf der Grundlage von Daten aus, die von der Quelldatenbank-Engine bereitgestellt werden.
Verschiedene Quelldatenbank-Engines liefern unterschiedliche Mengen an Informationen für ein Vorher-Abbild:
-
Oracle stellt für Spalten nur dann Aktualisierungen bereit, wenn sie sich ändern.
-
Postgre SQL stellt nur Daten für Spalten bereit, die Teil des Primärschlüssels sind (geändert oder nicht). Wenn die logische Replikation verwendet wird und für die Quelltabelle festgelegt REPLICA IDENTITY FULL ist, können Sie vollständige Vorher-Nachher-Informationen zu der Zeile abrufen, die in die Zeile geschrieben wurde WALs und hier verfügbar ist.
-
My stellt SQL im Allgemeinen Daten für alle Spalten bereit (geändert oder nicht).
Verwenden Sie entweder die BeforeImageSettings
-Aufgabeneinstellung oder den add-before-image-columns
-Parameter, um die Erstellung von Vorher-Abbildern zum Hinzufügen von Originalwerten aus der Quelldatenbank zur AWS DMS -Ausgabe zu aktivieren. Dieser Parameter wendet eine Spalten-Transformationsregel an.
BeforeImageSettings
fügt jedem Aktualisierungsvorgang mit Werten, die aus dem Quelldatenbanksystem gesammelt wurden, ein neues JSON Attribut hinzu, wie im Folgenden gezeigt.
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Anmerkung
Gilt für BeforeImageSettings
CDC Aufgaben mit Volllast plus (bei denen vorhandene Daten migriert und laufende Änderungen repliziert werden) oder CDC nur auf Aufgaben (bei denen nur Datenänderungen repliziert werden). Wenden Sie BeforeImageSettings
nicht auf Nur-Volllast-Aufgaben an.
Für BeforeImageSettings
-Optionen gilt Folgendes:
-
Legen Sie die
EnableBeforeImage
-Option vor dem Imaging auftrue
fest. Der Standardwert istfalse
. -
Verwenden Sie die
FieldName
Option, um dem neuen JSON Attribut einen Namen zuzuweisen. WannEnableBeforeImage
true
ist, istFieldName
erforderlich und darf nicht leer sein. -
Die
ColumnFilter
-Option gibt eine Spalte an, die vor dem Imaging hinzugefügt werden soll. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, verwenden Sie den Standardwertpk-only
. Um nur Spalten hinzuzufügen, die nicht vom LOB Typ sind, verwenden Sienon-lob
. Wenn Sie eine Spalte hinzufügen möchten, die einen Vorher-Abbild-Wert hat, verwenden Sieall
."BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
Verwenden einer Vorher-Abbild-Transformationsregel
Alternativ zu den Aufgabeneinstellungen können Sie den add-before-image-columns
-Parameter verwenden, der eine Spalten-Transformationsregel anwendet. Mit diesem Parameter können Sie das Imaging CDC auf Datenstreaming-Zielen wie Kafka vor dem Start aktivieren.
Wenn Sie add-before-image-columns
in einer Transformationsregel verwenden, können Sie eine feinere Steuerung der Ergebnisse für das Vorher-Abbild anwenden. Mit Transformationsregeln können Sie einen Objekt-Locator verwenden, der Ihnen die Kontrolle über die für die Regel ausgewählten Tabellen gibt. Außerdem können Sie Transformationsregeln miteinander verketten, wodurch verschiedene Regeln auf verschiedene Tabellen angewendet werden können. Anschließend können Sie die erzeugten Spalten mithilfe anderer Regeln bearbeiten.
Anmerkung
Verwenden Sie den add-before-image-columns
-Parameter nicht zusammen mit der BeforeImageSettings
-Aufgabeneinstellung innerhalb derselben Aufgabe. Verwenden Sie stattdessen entweder den Parameter oder die Einstellung, aber nicht beide, für eine einzelne Aufgabe.
Ein transformation
-Regeltyp mit dem add-before-image-columns
-Parameter für eine Spalte muss einen before-image-def
-Abschnitt bereitstellen. Es folgt ein Beispiel.
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
Der Wert von column-prefix
wird einem Spaltennamen vorangestellt, und der Standardwert von column-prefix
ist BI_
. Der Wert von column-suffix
wird an den Spaltennamen angehängt, und der Standardwert ist leer. Setzen Sie nicht column-prefix
und column-suffix
auf leere Zeichenfolgen.
Wählen Sie einen Wert für column-filter
. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, wählen Sie pk-only
. Wählen Sie ausnon-lob
, dass nur Spalten hinzugefügt werden sollen, die nicht vom LOB Typ sind. Oder lassenall
Sie eine Spalte hinzufügen, die einen Vorher-Abbild-Wert hat.
Beispiel für eine Vorher-Abbild-Transformationsregel
Die Transformationsregel im folgenden Beispiel fügt eine neue Spalte mit dem Namen BI_emp_no
auf dem Ziel hinzu. Eine Anweisung wie UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
füllt daher das BI_emp_no
Feld mit 1. Wenn Sie CDC Aktualisierungen für Amazon S3 S3-Ziele schreiben, können Sie anhand der BI_emp_no
Spalte erkennen, welche ursprüngliche Zeile aktualisiert wurde.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
Weitere Informationen zur Verwendung der add-before-image-columns
-Regelaktion finden Sie unter Transformationsregeln und Aktionen.
Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service
Bei der Verwendung von Apache Kafka als Ziel gelten die folgenden Einschränkungen:
-
AWS DMS Kafka-Zielendpunkte unterstützen keine IAM Zugriffskontrolle für Amazon Managed Streaming for Apache Kafka (Amazon). MSK
-
LOBDer Vollmodus wird nicht unterstützt.
-
Geben Sie eine Kafka-Konfigurationsdatei für Ihren Cluster mit Eigenschaften AWS DMS an, mit denen Sie automatisch neue Themen erstellen können. Schließen Sie die Einstellung
auto.create.topics.enable = true
ein. Wenn Sie Amazon verwendenMSK, können Sie bei der Erstellung Ihres Kafka-Clusters die Standardkonfiguration angeben und dieauto.create.topics.enable
Einstellung dann auftrue
ändern. Weitere Informationen zu den Standardkonfigurationseinstellungen finden Sie unter Die MSK Amazon-Standardkonfiguration im Amazon Managed Streaming for Apache Kafka Developer Guide. Wenn Sie einen vorhandenen Kafka-Cluster ändern müssen, der mit Amazon erstellt wurdeMSK, führen Sie den AWS CLI Befehl aus,aws kafka create-configuration
um Ihre Kafka-Konfiguration zu aktualisieren, wie im folgenden Beispiel gezeigt:14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }
Hier ist
//~/kafka_configuration
die Konfigurationsdatei, die Sie mit den erforderlichen Eigenschaftseinstellungen erstellt haben.Wenn Sie Ihre eigene Kafka-Instance verwenden, die auf Amazon installiert istEC2, ändern Sie die Kafka-Cluster-Konfiguration mit der
auto.create.topics.enable = true
Einstellung, dass mithilfe der in Ihrer Instance bereitgestellten Optionen automatisch neue Themen erstellt werden können AWS DMS . -
AWS DMS veröffentlicht unabhängig von den Transaktionen jedes Update eines einzelnen Datensatzes in der Quelldatenbank als einen Datensatz (Nachricht) in einem bestimmten Kafka-Thema.
-
AWS DMS unterstützt die folgenden zwei Formen für Partitionsschlüssel:
-
SchemaName.TableName
: Eine Kombination des Schema- und Tabellennamens. -
${AttributeName}
: Der Wert eines der Felder in der JSON oder der Primärschlüssel der Tabelle in der Quelldatenbank.
-
-
BatchApply
wird für einen Kafka-Endpunkt nicht unterstützt. Bei Verwendung von Batch Apply (z. B. der Zielmetadaten-AufgabeneinstellungBatchApplyEnabled
) für ein Kafka-Ziel kann es zu einem Datenverlust kommen. -
AWS DMS unterstützt nicht die Migration von Werten eines
BigInt
Datentyps mit mehr als 16 Ziffern. Um diese Einschränkung zu umgehen, können Sie die folgende Transformationsregel verwenden, um dieBigInt
-Spalte in eine Zeichenfolge zu konvertieren. Informationen zu Transformationsregeln finden Sie unter Transformationsregeln und Aktionen.{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
Verwenden der Objektzuweisung zum Migrieren von Daten zu einem Kafka-Thema
AWS DMS verwendet Regeln für die Tabellenzuordnung, um Daten aus der Quelle dem Kafka-Zielthema zuzuordnen. Um Daten einem Zielthema zuzuweisen, verwenden Sie eine Art von Tabellenzuweisungsregel, die als Objektzuweisung bezeichnet wird. Mit der Objektzuweisung legen Sie fest, wie Datensätze in der Quelle den in einem Kafka-Thema veröffentlichten Datensätzen zugewiesen werden.
Kafka-Themen verfügen bis auf einen Partitionsschlüssel über keine voreingestellte Struktur.
Anmerkung
Sie müssen die Objektzuweisung nicht verwenden. Sie können die reguläre Tabellenzuweisung für verschiedene Transformationen verwenden. Der Partitionsschlüsseltyp folgt jedoch diesen Standardverhaltensweisen:
-
Der Primärschlüssel wird als Partitionsschlüssel für Volllast verwendet.
-
Wird als Partitionsschlüssel für verwendet, wenn keine Aufgabeneinstellungen zur parallelen Anwendung
schema.table
verwendet werden. CDC -
Wenn Aufgabeneinstellungen mit parallelem Anwenden verwendet werden, wird der Primärschlüssel als Partitionsschlüssel für verwendet. CDC
Um eine Objektzuweisungsregel zu erstellen, legen Sie rule-type
als object-mapping
fest. Diese Regel gibt an, welchen Objektzuweisungstyp Sie verwenden möchten.
Die Struktur für die Regel lautet wie folgt.
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS unterstützt derzeit map-record-to-record
und map-record-to-document
als einzig gültige Werte für den rule-action
Parameter. Diese Einstellungen wirken sich auf Werte aus, die nicht in der exclude-columns
-Attributliste ausgeschlossen sind. Die map-record-to-document
Werte map-record-to-record
und geben an, wie diese Datensätze standardmäßig AWS DMS behandelt werden. Diese Werte wirken sich in keiner Weise auf die Attributzuweisungen aus.
Verwenden Sie map-record-to-record
beim Migrieren aus einer relationalen Datenbank zu einem Kafka-Thema. Dieser Regeltyp verwendet den taskResourceId.schemaName.tableName
-Wert aus der relationalen Datenbank als Partitionsschlüssel in dem Kafka-Thema und erstellt ein Attribut für jede Spalte in der Quelldatenbank.
Beachten Sie bei Verwendung von map-record-to-record
Folgendes:
Diese Einstellung wirkt sich nur auf Spalten aus, die durch die
exclude-columns
-Liste ausgeschlossen wurden.AWS DMS Erstellt für jede dieser Spalten ein entsprechendes Attribut im Zielthema.
AWS DMS erstellt dieses entsprechende Attribut unabhängig davon, ob die Quellspalte in einer Attributzuordnung verwendet wird.
Eine Möglichkeit, map-record-to-record
zu verstehen, besteht darin, sich die praktische Anwendung zu veranschaulichen. In diesem Beispiel wird davon ausgegangen, dass Sie mit einer Tabellenzeile einer relationalen Datenbank beginnen, die die folgende Struktur aufweist und die folgenden Daten enthält.
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
02/29/1988 |
Um diese Informationen von einem Schema mit dem Namen Test
zu einem Kafka-Thema zu migrieren, erstellen Sie Regeln, um die Daten dem Zielthema zuzuweisen. Die folgende Regel veranschaulicht die Zuweisung.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
Nachfolgend wird das resultierende Datensatzformat bei Verwendung unserer Beispieldaten in dem Kafka-Zielthema für ein bestimmtes Kafka-Thema und einen bestimmten Partitionsschlüssel (in diesem Fall taskResourceId.schemaName.tableName
) illustriert.
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
Themen
Umstrukturieren von Daten mit Attributzuweisung
Sie können die Daten umstrukturieren, während Sie sie mittels einer Attributzuweisung zu einem Kafka-Thema migrieren. So möchten Sie zum Beispiel vielleicht mehrere Felder in der Quelle in einem einzigen Feld im Ziel vereinen. Die folgenden Attributzuordnung veranschaulicht, wie die Daten umstrukturiert werden.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
Um einen konstanten Wert für partition-key
festzulegen, geben Sie einen partition-key
-Wert an. So könnten Sie auf diese Weise beispielsweise erzwingen, dass alle Daten in einer einzigen Partition gespeichert werden. Die folgende Darstellung veranschaulicht dieses Konzept.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Anmerkung
Der partition-key
-Wert für einen Steuerungsdatensatz, der für eine spezifische Tabelle bestimmt ist, lautet TaskId.SchemaName.TableName
. Der partition-key
-Wert für einen Steuerungsdatensatz, der für eine spezifische Aufgabe bestimmt ist, ist die TaskId
des betreffenden Datensatzes. Wenn Sie einen partition-key
-Wert in der Objektzuweisung angeben, hat dies keine Auswirkungen auf den partition-key
für einen Steuerungsdatensatz.
Replikation für mehrere Themen mithilfe der Objektzuweisung
Standardmäßig migrieren AWS DMS Aufgaben alle Quelldaten zu einem der folgenden Kafka-Themen:
Wie im Feld Thema des AWS DMS Zielendpunkts angegeben.
Wie von
kafka-default-topic
angegeben, wenn das Feld Thema des Zielendpunkts nicht gefüllt ist und die Kafka-Einstellungauto.create.topics.enable
auftrue
gesetzt ist.
Bei AWS DMS Engine-Versionen 3.4.6 und höher können Sie das kafka-target-topic
Attribut verwenden, um jede migrierte Quelltabelle einem separaten Thema zuzuordnen. Mit den folgenden Objektzuweisungsregeln werden beispielsweise die Quelltabellen Customer
und Address
zu den Kafka-Themen customer_topic
bzw. address_topic
migriert. AWS DMS Migriert gleichzeitig alle anderen Quelltabellen, einschließlich der Bills
Tabelle im Test
Schema, zu dem im Zielendpunkt angegebenen Thema.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }
Mithilfe der Kafka-Replikation für mehrere Themen können Sie Quelltabellen mit einer einzigen Replikationsaufgabe gruppieren und zu separaten Kafka-Themen migrieren.
Nachrichtenformat für Apache Kafka
Die JSON Ausgabe ist einfach eine Liste von Schlüssel-Wert-Paaren.
- RecordType
-
Der Datensatztyp kann entweder für Daten oder zur Steuerung bestimmt sein. Datensätze für Datenrepräsentieren die tatsächlichen Zeilen in der Quelle. Steuerungsdatensätze sind für wichtige Ereignisse im Stream bestimmt, z. B. einen Neustart der Aufgabe.
- Operation
-
Mögliche Operationen für Datensätze sind
load
,insert
,update
oderdelete
.Mögliche Operationen für Steuerungsdatensätze sind
create-table
,rename-table
,drop-table
,change-columns
,add-column
,drop-column
,rename-column
odercolumn-type-change
. - SchemaName
-
Das Quellschema für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.
- TableName
-
Die Quelltabelle für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.
- Zeitstempel
-
Der Zeitstempel für den Zeitpunkt, JSON an dem die Nachricht erstellt wurde. Das Feld ist mit dem ISO 8601-Format formatiert.
Das folgende JSON Nachrichtenbeispiel veranschaulicht einen Datentyp „Nachricht“ mit allen zusätzlichen Metadaten.
{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }
Das folgende JSON Nachrichtenbeispiel veranschaulicht eine Meldung vom Typ Steuerelement.
{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }