Verwendung von Apache Kafka als Ziel für AWS Database Migration Service - AWS Database Migration Service

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwendung von Apache Kafka als Ziel für AWS Database Migration Service

Sie können es verwenden AWS DMS , um Daten zu einem Apache Kafka-Cluster zu migrieren. Apache Kafka ist eine verteilte Streaming-Plattform. Mit Apache Kafka können Sie Streaming-Daten in Echtzeit erfassen und verarbeiten.

AWS bietet auch Amazon Managed Streaming for Apache Kafka (AmazonMSK) zur Verwendung als AWS DMS Ziel an. Amazon MSK ist ein vollständig verwalteter Apache Kafka-Streaming-Service, der die Implementierung und Verwaltung von Apache Kafka-Instances vereinfacht. Es funktioniert mit Open-Source-Versionen von Apache Kafka, und Sie greifen genau wie jede Apache Kafka-Instance auf MSK Amazon-Instances als AWS DMS Ziele zu. Weitere Informationen finden Sie unter Was ist AmazonMSK? im Amazon Managed Streaming for Apache Kafka Developer Guide.

Ein Kafka-Cluster speichert Streams von Datensätzen in Kategorien, die als Themen bezeichnet und in Partitionen unterteilt werden. Partitionen sind eindeutig identifizierte Sequenzen von Datensätzen (Nachrichten) in einem Thema. Partitionen können über mehrere Broker in einem Cluster verteilt werden, um die parallele Verarbeitung der Datensätze eines Themas zu ermöglichen. Weitere Informationen zu Themen und Partitionen und ihrer Verteilung in Apache Kafka finden Sie unter Themen und Protokolle und Verteilung.

Ihr Kafka-Cluster kann entweder eine MSK Amazon-Instance, ein Cluster, der auf einer EC2 Amazon-Instance läuft, oder ein lokaler Cluster sein. Eine MSK Amazon-Instance oder ein Cluster auf einer EC2 Amazon-Instance kann sich in derselben VPC oder einer anderen Instance befinden. Wenn es sich um einen On-Premises-Cluster handelt, können Sie Ihren eigenen On-Premises-Namensserver für Ihre Replikations-Instance verwenden, um den Host-Namen des Clusters aufzulösen. Informationen zum Einrichten eines Namensservers für Ihre Replikations-Instance finden Sie unter Verwenden Ihres eigenen Vor-Ort-Nameservers. Weitere Informationen zum Einrichten eines Netzwerks finden Sie unter Einrichten eines Netzwerks für eine Replikations-Instance.

Wenn Sie einen MSK Amazon-Cluster verwenden, stellen Sie sicher, dass dessen Sicherheitsgruppe den Zugriff von Ihrer Replikationsinstanz aus ermöglicht. Informationen zum Ändern der Sicherheitsgruppe für einen MSK Amazon-Cluster finden Sie unter Ändern der Sicherheitsgruppe eines MSK Amazon-Clusters.

AWS Database Migration Service veröffentlicht Datensätze zu einem Kafka-Thema mitJSON. AWS DMS Serialisiert während der Konvertierung jeden Datensatz aus der Quelldatenbank in ein Attribut-Wert-Paar im Format. JSON

Zum Migrieren Ihrer Daten von einer unterstützten Datenquelle zu einem Kafka-Ziel-Cluster verwenden Sie die Objektzuweisung. Mit der Objektzuweisung bestimmen Sie, wie die Datensätze im Zielthema strukturiert werden sollen. Außerdem definieren Sie einen Partitionsschlüssel für jede Tabelle, die Apache Kafka zum Gruppieren der Daten in seine Partitionen verwendet.

AWS DMS Unterstützt derzeit ein einzelnes Thema pro Aufgabe. Bei einer einzelnen Aufgabe mit mehreren Tabellen beziehen sich alle Nachrichten auf ein einzelnes Thema. Jede Nachricht enthält einen Metadaten-Abschnitt, der das Zielschema und die Zieltabelle identifiziert. AWS DMS Versionen 3.4.6 und höher unterstützen die themenübergreifende Replikation mithilfe von Objektzuordnung. Weitere Informationen finden Sie unter Replikation für mehrere Themen mithilfe der Objektzuweisung.

Apache Kafka-Endpunkteinstellungen

Sie können Verbindungsdetails über die Endpunkteinstellungen in der AWS DMS Konsole oder über die --kafka-settings Option in der angeben. CLI Es folgen die Anforderungen für jede Einstellung:

  • Broker – Geben Sie die Speicherorte eines oder mehrerer Broker in Ihrem Kafka-Cluster jeweils in Form einer durch Kommata getrennten Liste mit broker-hostname:port an. Ein Beispiel ist "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Mit dieser Einstellung können Sie die Speicherorte einiger oder aller Broker im Cluster angeben. Die Cluster-Broker kommunizieren miteinander, um die Partitionierung von Datensätzen zu verarbeiten, die zu dem Thema migriert wurden.

  • Topic – (Optional) Geben Sie den Themennamen mit einer maximalen Länge von 255 Buchstaben und Symbolen an. Sie können Punkt (.), Unterstrich (_) und Minuszeichen (-) verwenden. Themennamen mit einem Punkt (.) oder einem Unterstrich (_) können in internen Datenstrukturen kollidieren. Verwenden Sie entweder eines, aber nicht beide dieser Symbole im Namen des Themas. Wenn Sie keinen Themennamen angeben, AWS DMS verwendet "kafka-default-topic" als Migrationsthema.

    Anmerkung

    Wenn Sie entweder ein von Ihnen angegebenes Migrationsthema oder das Standardthema AWS DMS erstellen möchten, das auto.create.topics.enable = true als Teil Ihrer Kafka-Cluster-Konfiguration festgelegt wurde. Weitere Informationen finden Sie unter Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service

  • MessageFormat – Das Ausgabeformat für die Datensätze, die auf dem Endpunkt erstellt wurden. Das Nachrichtenformat ist JSON (Standard) oder JSON_UNFORMATTED (eine einzelne Zeile ohne Tabulator).

  • MessageMaxBytes – Die maximale Größe in Byte für Datensätze, die auf dem Endpunkt erstellt wurden. Der Standardwert ist 1.000.000.

    Anmerkung

    Sie können den Wert AWS CLI/nur verwenden, um zu einem SDK Wert MessageMaxBytes zu wechseln, der nicht dem Standard entspricht. Verwenden Sie beispielsweise den folgenden Befehl, um Ihren vorhandenen Kafka-Endpunkt und MessageMaxBytes zu ändern.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – Stellt detaillierte Transaktionsinformationen aus der Quelldatenbank bereit. Diese Informationen beinhalten einen Durchführungszeitstempel, eine Protokollposition sowie Werte für transaction_id, previous_transaction_id und transaction_record_id (den Datensatzoffset innerhalb einer Transaktion). Der Standardwert ist false.

  • IncludePartitionValue – Zeigt den Partitionswert innerhalb der Kafka-Nachrichtenausgabe an, es sei denn, der Partitionstyp ist schema-table-type. Der Standardwert ist false.

  • PartitionIncludeSchemaTable – Fügt Schema- und Tabellennamen zu Partitionswerten als Präfix hinzu, wenn der Partitionstyp primary-key-type ist. Dadurch wird die Datenverteilung zwischen Kafka-Partitionen erhöht. Angenommen, ein SysBench-Schema hat Tausende von Tabellen und jede davon hat nur einen begrenzten Bereich für einen Primärschlüssel. In diesem Fall wird derselbe Primärschlüssel von Tausenden von Tabellen an dieselbe Partition gesendet, was zu einer Drosselung führt. Der Standardwert ist false.

  • IncludeTableAlterOperations— Schließt alle Operationen der Datendefinitionssprache (DDL) ein, die die Tabelle in den Steuerdaten ändern, z. B. rename-tabledrop-table,, add-columndrop-column, undrename-column. Der Standardwert ist false.

  • IncludeControlDetails – Zeigt detaillierte Steuerungsinformationen für Tabellendefinition, Spaltendefinition und Tabellen- und Spaltenänderungen in der Kafka-Nachrichtenausgabe an. Der Standardwert ist false.

  • IncludeNullAndEmpty— Schließt Spalten in das Ziel ein NULL und leert sie. Der Standardwert ist false.

  • SecurityProtocol— Stellt mithilfe von Transport Layer Security (TLS) eine sichere Verbindung zu einem Kafka-Zielendpunkt her. Optionen: ssl-authentication, ssl-encryption und sasl-ssl. Für die Verwendung von sasl-ssl sind SaslUsername und SaslPassword erforderlich.

  • SslEndpointIdentificationAlgorithm— Legt die Überprüfung des Hostnamens für das Zertifikat fest. Diese Einstellung wird in AWS DMS Version 3.5.1 und höher unterstützt. Es gibt die folgenden Optionen:

    • NONE: Deaktiviert die Überprüfung des Hostnamens des Brokers in der Client-Verbindung.

    • HTTPS: Aktiviert die Überprüfung des Hostnamens des Brokers in der Client-Verbindung.

  • useLargeIntegerValue— Verwenden Sie bis zu 18-stellige Ganzzahlen, anstatt Ganzzahlen als Doppelzahl umzuwandeln. Dies ist ab AWS DMS Version 3.5.4 verfügbar. Der Standardwert lautet „false“.

Sie können Einstellungen verwenden, um die Übertragungsgeschwindigkeit zu erhöhen. Dazu unterstützt AWS DMS Multi-Thread-Volllastvorgänge in einen Ziel-Cluster in Apache Kafka. AWS DMS unterstützt Multi-Threading u. a. mithilfe der folgenden Aufgabeneinstellungen:

  • MaxFullLoadSubTasks— Verwenden Sie diese Option, um die maximale Anzahl von Quelltabellen anzugeben, die parallel geladen werden sollen. AWS DMS lädt jede Tabelle mithilfe einer speziellen Unteraufgabe in die entsprechende Kafka-Zieltabelle. Der Standardwert beträgt 8; der Maximalwert beträgt 49.

  • ParallelLoadThreads— Verwenden Sie diese Option, um die Anzahl der Threads anzugeben, die AWS DMS verwendet werden, um jede Tabelle in ihre Kafka-Zieltabelle zu laden. Der maximale Wert für ein Apache Kafka-Ziel ist 32. Sie können eine Erhöhung dieses Höchstwerts anfordern.

  • ParallelLoadBufferSize – Verwenden Sie diese Option, um die maximale Anzahl der Datensätze anzugeben, die in dem Puffer gespeichert werden sollen, den die parallelen Lade-Threads zum Laden von Daten in das Kafka-Ziel verwenden. Der Standardwert lautet 50. Die maximale Wert ist 1.000. Verwenden Sie diese Einstellung mit ParallelLoadThreads; ParallelLoadBufferSize ist nur gültig, wenn es mehr als einen Thread gibt.

  • ParallelLoadQueuesPerThread – Verwenden Sie diese Option, um die Anzahl der Warteschlangen anzugeben, auf die jeder gleichzeitige Thread zugreift, um Datensätze aus Warteschlangen zu entfernen und eine Stapellast für das Ziel zu generieren. Der Standardwert ist 1. Der Höchstwert ist 512.

Sie können die Leistung von Change Data Capture (CDC) für Kafka-Endpunkte verbessern, indem Sie die Aufgabeneinstellungen für parallel Threads und Massenoperationen optimieren. Dazu können Sie die Anzahl der gleichzeitigen Threads, der Warteschlangen pro Thread und die Anzahl der Datensätze angeben, die in einem Puffer unter Verwendung von ParallelApply*-Aufgabeneinstellungen gespeichert werden sollen. Nehmen wir zum Beispiel an, Sie möchten einen CDC Ladevorgang ausführen und 128 Threads parallel anwenden. Außerdem möchten Sie auf 64 Warteschlangen pro Thread zugreifen, wobei 50 Datensätze pro Puffer gespeichert sind.

AWS DMS Unterstützt zur CDC Steigerung der Leistung die folgenden Aufgabeneinstellungen:

  • ParallelApplyThreads— Gibt die Anzahl gleichzeitiger Threads an, die während eines CDC Ladevorgangs AWS DMS verwendet werden, um Datensätze an einen Kafka-Zielendpunkt zu übertragen. Der Standardwert ist Null (0) und der maximale Wert ist 32.

  • ParallelApplyBufferSize— Gibt die maximale Anzahl von Datensätzen an, die in jeder Pufferwarteschlange gespeichert werden sollen, damit gleichzeitige Threads während eines Ladevorgangs an einen Kafka-Zielendpunkt weitergeleitet werden. CDC Der Standardwert ist 100 und der Höchstwert 1 000. Verwenden Sie diese Option, wenn ParallelApplyThreads mehrere Threads angibt.

  • ParallelApplyQueuesPerThread— Gibt die Anzahl der Warteschlangen an, auf die jeder Thread zugreift, um Datensätze aus den Warteschlangen zu entfernen und währenddessen ein Batch-Load für einen Kafka-Endpunkt zu generieren. CDC Der Standardwert ist 1. Der Höchstwert ist 512.

Wenn Sie ParallelApply*-Aufgabeneinstellungen verwenden, ist der primary-key der Tabelle der partition-key-type-Standardwert, nicht schema-name.table-name.

Mit Transport Layer Security () eine Verbindung zu Kafka herstellen TLS

Ein Kafka-Cluster akzeptiert sichere Verbindungen mithilfe von Transport Layer Security ()TLS. Mit DMS können Sie eine der folgenden drei Sicherheitsprotokolloptionen verwenden, um eine Kafka-Endpunktverbindung zu sichern.

SSLVerschlüsselung () server-encryption

Clients validieren die Serveridentität anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

SSLAuthentifizierung (mutual-authentication)

Server und Client validieren die Identität untereinander durch ihre eigenen Zertifikate. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

SASL-SSL (mutual-authentication)

Die Methode Simple Authentication and Security Layer (SASL) ersetzt das Zertifikat des Clients durch einen Benutzernamen und ein Passwort, um die Identität eines Clients zu überprüfen. Sie geben einen Benutzernamen und ein Passwort an, die der Server registriert hat, damit der Server die Identität eines Clients überprüfen kann. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

Wichtig

Apache Kafka und Amazon MSK akzeptieren aufgelöste Zertifikate. Dies ist eine bekannte Einschränkung von Kafka und AmazonMSK, die behoben werden muss. Weitere Informationen finden Sie unter Probleme mit Apache Kafka, KAFKA -3700.

Wenn Sie Amazon verwendenMSK, sollten Sie die Verwendung von Zugriffskontrolllisten (ACLs) als Workaround für diese bekannte Einschränkung in Betracht ziehen. Weitere Informationen zur Verwendung ACLs finden Sie im ACLs Abschnitt Apache Kafka im Amazon Managed Streaming for Apache Kafka Developer Guide.

Wenn Sie einen selbstverwalteten Kafka-Cluster verwenden, finden Sie Informationen zur Konfiguration des Clusters im Kommentar vom 21. Oktober 18.

SSLVerschlüsselung mit Amazon MSK oder einem selbstverwalteten Kafka-Cluster verwenden

Sie können SSL Verschlüsselung verwenden, um eine Endpunktverbindung zu Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern. Wenn Sie die SSL Verschlüsselungsauthentifizierungsmethode verwenden, validieren Clients die Identität eines Servers anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

So verwenden Sie SSL Verschlüsselung, um eine Verbindung zu Amazon herzustellen MSK
  • Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option ssl-encryption fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen.

    Im folgenden JSON Beispiel wird das Sicherheitsprotokoll als SSL Verschlüsselung festgelegt.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Um SSL Verschlüsselung für einen selbstverwalteten Kafka-Cluster zu verwenden
  1. Wenn Sie eine private Zertifizierungsstelle (CA) in Ihrem lokalen Kafka-Cluster verwenden, laden Sie Ihr privates CA-Zertifikat hoch und erhalten Sie einen Amazon-Ressourcennamen (). ARN

  2. Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option ssl-encryption fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen. Im folgenden JSON Beispiel wird das Sicherheitsprotokoll als festgelegt. ssl-encryption

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Wenn Sie eine private Zertifizierungsstelle verwenden, geben ARN Sie die SslCaCertificateArn im ersten Schritt oben angegebene Zertifizierungsstelle ein.

SSLAuthentifizierung verwenden

Sie können die SSL Authentifizierung verwenden, um eine Endpunktverbindung zu Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern.

Gehen Sie wie folgt vor, um die SSL Client-Authentifizierung und Verschlüsselung mithilfe der Authentifizierung für die Verbindung mit Amazon MSK zu aktivieren:

  • Bereiten Sie einen privaten Schlüssel und ein öffentliches Zertifikat für Kafka vor.

  • Laden Sie Zertifikate in den DMS Zertifikatsmanager hoch.

  • Erstellen Sie einen Kafka-Zielendpunkt mit dem entsprechenden Zertifikat, das in den Kafka-Endpunkteinstellungen ARNs angegeben ist.

Um einen privaten Schlüssel und ein öffentliches Zertifikat für Amazon vorzubereiten MSK
  1. Erstellen Sie eine EC2 Instance und richten Sie einen Client für die Verwendung der Authentifizierung ein, wie in den Schritten 1 bis 9 im Abschnitt Client-Authentifizierung im Amazon Managed Streaming for Apache Kafka Developer Guide beschrieben.

    Nachdem Sie diese Schritte abgeschlossen haben, verfügen Sie über ein Zertifikat ARN (das öffentliche Zertifikat, das in ARN gespeichert istACM) und einen privaten Schlüssel, der in einer kafka.client.keystore.jks Datei enthalten ist.

  2. Rufen Sie das öffentliche Zertifikat ab und kopieren Sie das Zertifikat mit dem folgenden Befehl in die Datei signed-certificate-from-acm.pem:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Dieser Befehl gibt ähnliche Informationen wie im folgenden Beispiel zurück:

    {"Certificate": "123", "CertificateChain": "456"}

    Anschließend kopieren Sie Ihr Äquivalent von "123" in die Datei signed-certificate-from-acm.pem.

  3. Rufen Sie den privaten Schlüssel ab, indem Sie den Schlüssel msk-rsa aus kafka.client.keystore.jks to keystore.p12 importieren, wie im folgenden Beispiel gezeigt.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Verwenden Sie den folgenden Befehl, um keystore.p12 in das Format .pem zu exportieren.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Die Meldung „PEMPassphrase eingeben“ wird angezeigt und gibt den Schlüssel an, der zur Verschlüsselung des Zertifikats verwendet wird.

  5. Entfernen Sie Taschen- und Schlüsselattribute aus der .pem-Datei, um sicherzustellen, dass die erste Zeile mit der folgenden Zeichenfolge beginnt.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Um ein öffentliches Zertifikat und einen privaten Schlüssel in den DMS Zertifikatsmanager hochzuladen und die Verbindung zu Amazon zu testen MSK
  1. Laden Sie es mit dem folgenden Befehl in den DMS Zertifikatsmanager hoch.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Erstellen Sie einen MSK Amazon-Zielendpunkt und testen Sie die Verbindung, um sicherzustellen, dass die TLS Authentifizierung funktioniert.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Wichtig

Sie können die SSL Authentifizierung verwenden, um eine Verbindung zu einem selbstverwalteten Kafka-Cluster zu sichern. In Einzelfällen können Sie eine private Zertifizierungsstelle (CA) in Ihrem On-Premises-Kafka-Cluster verwenden. Wenn ja, laden Sie Ihre CA-Kette, Ihr öffentliches Zertifikat und Ihren privaten Schlüssel in den DMS Zertifikatsmanager hoch. Verwenden Sie dann den entsprechenden Amazon-Ressourcennamen (ARN) in Ihren Endpunkteinstellungen, wenn Sie Ihren lokalen Kafka-Zielendpunkt erstellen.

So bereiten Sie einen privaten Schlüssel und ein signiertes Zertifikat für einen selbstverwalteten Kafka-Cluster vor
  1. Erzeugen Sie ein Schlüsselpaar wie im folgenden Beispiel dargestellt.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Generieren Sie eine Anforderung zum Signieren eines Zertifikats ()CSR.

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Verwenden Sie die CA in Ihrem Cluster-Truststore, um das CSR zu signieren. Wenn Sie keine CA haben, können Sie Ihre eigene private CA erstellen.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importieren Sie ca-cert in den Truststore und Keystore des Servers. Wenn Sie keinen Truststore haben, führen Sie folgenden Befehl aus, um den Truststore zu erstellen und ca-cert in diesen zu importieren.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Signieren Sie das Zertifikat.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importieren Sie das signierte Zertifikat in den Keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Verwenden Sie den folgenden Befehl, um den Schlüssel on-premise-rsa von kafka.server.keystore.jks in keystore.p12 zu importieren.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Verwenden Sie den folgenden Befehl, um keystore.p12 in das Format .pem zu exportieren.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Laden Sie encrypted-private-server-key.pemsigned-certificate.pem, und in ca-cert den DMS Zertifikatsmanager hoch.

  10. Erstellen Sie einen Endpunkt mithilfe des zurückgegebenenARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Verwenden von SASL — SSL Authentifizierung für die Verbindung zu Amazon MSK

Die Methode Simple Authentication and Security Layer (SASL) verwendet einen Benutzernamen und ein Passwort, um die Identität eines Kunden zu überprüfen, und stellt eine verschlüsselte Verbindung zwischen Server und Client her.

Für die Verwendung SASL erstellen Sie zunächst einen sicheren Benutzernamen und ein sicheres Passwort, wenn Sie Ihren MSK Amazon-Cluster einrichten. Eine Beschreibung, wie Sie einen sicheren Benutzernamen und ein sicheres Passwort für einen MSK Amazon-Cluster einrichten, finden Sie unter Setting SASL SCRAM up/authentication for a Amazon MSK Cluster im Amazon Managed Streaming for Apache Kafka Developer Guide.

Wenn Sie dann Ihren Kafka-Zielendpunkt erstellen, legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option sasl-ssl fest. Legen Sie auch die Optionen SaslUsername und SaslPassword fest. Stellen Sie sicher, dass diese mit dem sicheren Benutzernamen und dem Passwort übereinstimmen, die Sie bei der ersten Einrichtung Ihres MSK Amazon-Clusters erstellt haben, wie im folgenden JSON Beispiel gezeigt.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Anmerkung
  • AWS DMS Unterstützt derzeit nur öffentliche, von CA gestützte SASL -SSL. DMSunterstützt nicht SASL — SSL zur Verwendung mit selbstverwaltetem Kafka, das von einer privaten CA unterstützt wird.

  • Für die SASL SSL Authentifizierung wird AWS DMS standardmäßig der Mechanismus SCRAM SHA -512 unterstützt. AWS DMS Versionen 3.5.0 und höher unterstützen auch den Plain-Mechanismus. Um den Plain-Mechanismus zu unterstützen, setzen Sie den SaslMechanism Parameter des KafkaSettings API Datentyps aufPLAIN.

Verwenden Sie ein Vorher-Bild, um die ursprünglichen Werte von CDC Zeilen für Apache Kafka als Ziel anzuzeigen

Wenn Sie CDC Aktualisierungen in ein Datenstreaming-Ziel wie Kafka schreiben, können Sie die ursprünglichen Werte einer Quelldatenbankzeile anzeigen, bevor sie durch ein Update geändert werden. Um dies zu ermöglichen, AWS DMS füllt es ein Vorher-Bild von Aktualisierungsereignissen auf der Grundlage von Daten aus, die von der Quelldatenbank-Engine bereitgestellt werden.

Verschiedene Quelldatenbank-Engines liefern unterschiedliche Mengen an Informationen für ein Vorher-Abbild:

  • Oracle stellt für Spalten nur dann Aktualisierungen bereit, wenn sie sich ändern.

  • Postgre SQL stellt nur Daten für Spalten bereit, die Teil des Primärschlüssels sind (geändert oder nicht). Wenn die logische Replikation verwendet wird und für die Quelltabelle festgelegt REPLICA IDENTITY FULL ist, können Sie vollständige Vorher-Nachher-Informationen zu der Zeile abrufen, die in die Zeile geschrieben wurde WALs und hier verfügbar ist.

  • My stellt SQL im Allgemeinen Daten für alle Spalten bereit (geändert oder nicht).

Verwenden Sie entweder die BeforeImageSettings-Aufgabeneinstellung oder den add-before-image-columns-Parameter, um die Erstellung von Vorher-Abbildern zum Hinzufügen von Originalwerten aus der Quelldatenbank zur AWS DMS -Ausgabe zu aktivieren. Dieser Parameter wendet eine Spalten-Transformationsregel an.

BeforeImageSettingsfügt jedem Aktualisierungsvorgang mit Werten, die aus dem Quelldatenbanksystem gesammelt wurden, ein neues JSON Attribut hinzu, wie im Folgenden gezeigt.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Anmerkung

Gilt für BeforeImageSettings CDC Aufgaben mit Volllast plus (bei denen vorhandene Daten migriert und laufende Änderungen repliziert werden) oder CDC nur auf Aufgaben (bei denen nur Datenänderungen repliziert werden). Wenden Sie BeforeImageSettings nicht auf Nur-Volllast-Aufgaben an.

Für BeforeImageSettings-Optionen gilt Folgendes:

  • Legen Sie die EnableBeforeImage-Option vor dem Imaging auf true fest. Der Standardwert ist false.

  • Verwenden Sie die FieldName Option, um dem neuen JSON Attribut einen Namen zuzuweisen. Wann EnableBeforeImage true ist, ist FieldName erforderlich und darf nicht leer sein.

  • Die ColumnFilter-Option gibt eine Spalte an, die vor dem Imaging hinzugefügt werden soll. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, verwenden Sie den Standardwert pk-only. Um nur Spalten hinzuzufügen, die nicht vom LOB Typ sind, verwenden Sienon-lob. Wenn Sie eine Spalte hinzufügen möchten, die einen Vorher-Abbild-Wert hat, verwenden Sie all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Verwenden einer Vorher-Abbild-Transformationsregel

Alternativ zu den Aufgabeneinstellungen können Sie den add-before-image-columns-Parameter verwenden, der eine Spalten-Transformationsregel anwendet. Mit diesem Parameter können Sie das Imaging CDC auf Datenstreaming-Zielen wie Kafka vor dem Start aktivieren.

Wenn Sie add-before-image-columns in einer Transformationsregel verwenden, können Sie eine feinere Steuerung der Ergebnisse für das Vorher-Abbild anwenden. Mit Transformationsregeln können Sie einen Objekt-Locator verwenden, der Ihnen die Kontrolle über die für die Regel ausgewählten Tabellen gibt. Außerdem können Sie Transformationsregeln miteinander verketten, wodurch verschiedene Regeln auf verschiedene Tabellen angewendet werden können. Anschließend können Sie die erzeugten Spalten mithilfe anderer Regeln bearbeiten.

Anmerkung

Verwenden Sie den add-before-image-columns-Parameter nicht zusammen mit der BeforeImageSettings-Aufgabeneinstellung innerhalb derselben Aufgabe. Verwenden Sie stattdessen entweder den Parameter oder die Einstellung, aber nicht beide, für eine einzelne Aufgabe.

Ein transformation-Regeltyp mit dem add-before-image-columns-Parameter für eine Spalte muss einen before-image-def-Abschnitt bereitstellen. Es folgt ein Beispiel.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

Der Wert von column-prefix wird einem Spaltennamen vorangestellt, und der Standardwert von column-prefix ist BI_. Der Wert von column-suffix wird an den Spaltennamen angehängt, und der Standardwert ist leer. Setzen Sie nicht column-prefix und column-suffix auf leere Zeichenfolgen.

Wählen Sie einen Wert für column-filter. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, wählen Sie pk-only . Wählen Sie ausnon-lob, dass nur Spalten hinzugefügt werden sollen, die nicht vom LOB Typ sind. Oder lassenall Sie eine Spalte hinzufügen, die einen Vorher-Abbild-Wert hat.

Beispiel für eine Vorher-Abbild-Transformationsregel

Die Transformationsregel im folgenden Beispiel fügt eine neue Spalte mit dem Namen BI_emp_no auf dem Ziel hinzu. Eine Anweisung wie UPDATE employees SET emp_no = 3 WHERE emp_no = 1; füllt daher das BI_emp_no Feld mit 1. Wenn Sie CDC Aktualisierungen für Amazon S3 S3-Ziele schreiben, können Sie anhand der BI_emp_no Spalte erkennen, welche ursprüngliche Zeile aktualisiert wurde.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Weitere Informationen zur Verwendung der add-before-image-columns-Regelaktion finden Sie unter Transformationsregeln und Aktionen.

Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service

Bei der Verwendung von Apache Kafka als Ziel gelten die folgenden Einschränkungen:

  • AWS DMS Kafka-Zielendpunkte unterstützen keine IAM Zugriffskontrolle für Amazon Managed Streaming for Apache Kafka (Amazon). MSK

  • LOBDer Vollmodus wird nicht unterstützt.

  • Geben Sie eine Kafka-Konfigurationsdatei für Ihren Cluster mit Eigenschaften AWS DMS an, mit denen Sie automatisch neue Themen erstellen können. Schließen Sie die Einstellung auto.create.topics.enable = true ein. Wenn Sie Amazon verwendenMSK, können Sie bei der Erstellung Ihres Kafka-Clusters die Standardkonfiguration angeben und die auto.create.topics.enable Einstellung dann auf true ändern. Weitere Informationen zu den Standardkonfigurationseinstellungen finden Sie unter Die MSK Amazon-Standardkonfiguration im Amazon Managed Streaming for Apache Kafka Developer Guide. Wenn Sie einen vorhandenen Kafka-Cluster ändern müssen, der mit Amazon erstellt wurdeMSK, führen Sie den AWS CLI Befehl aus, aws kafka create-configuration um Ihre Kafka-Konfiguration zu aktualisieren, wie im folgenden Beispiel gezeigt:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Hier ist //~/kafka_configuration die Konfigurationsdatei, die Sie mit den erforderlichen Eigenschaftseinstellungen erstellt haben.

    Wenn Sie Ihre eigene Kafka-Instance verwenden, die auf Amazon installiert istEC2, ändern Sie die Kafka-Cluster-Konfiguration mit der auto.create.topics.enable = true Einstellung, dass mithilfe der in Ihrer Instance bereitgestellten Optionen automatisch neue Themen erstellt werden können AWS DMS .

  • AWS DMS veröffentlicht unabhängig von den Transaktionen jedes Update eines einzelnen Datensatzes in der Quelldatenbank als einen Datensatz (Nachricht) in einem bestimmten Kafka-Thema.

  • AWS DMS unterstützt die folgenden zwei Formen für Partitionsschlüssel:

    • SchemaName.TableName: Eine Kombination des Schema- und Tabellennamens.

    • ${AttributeName}: Der Wert eines der Felder in der JSON oder der Primärschlüssel der Tabelle in der Quelldatenbank.

  • BatchApply wird für einen Kafka-Endpunkt nicht unterstützt. Bei Verwendung von Batch Apply (z. B. der Zielmetadaten-Aufgabeneinstellung BatchApplyEnabled) für ein Kafka-Ziel kann es zu einem Datenverlust kommen.

  • AWS DMS unterstützt nicht die Migration von Werten eines BigInt Datentyps mit mehr als 16 Ziffern. Um diese Einschränkung zu umgehen, können Sie die folgende Transformationsregel verwenden, um die BigInt-Spalte in eine Zeichenfolge zu konvertieren. Informationen zu Transformationsregeln finden Sie unter Transformationsregeln und Aktionen.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Verwenden der Objektzuweisung zum Migrieren von Daten zu einem Kafka-Thema

AWS DMS verwendet Regeln für die Tabellenzuordnung, um Daten aus der Quelle dem Kafka-Zielthema zuzuordnen. Um Daten einem Zielthema zuzuweisen, verwenden Sie eine Art von Tabellenzuweisungsregel, die als Objektzuweisung bezeichnet wird. Mit der Objektzuweisung legen Sie fest, wie Datensätze in der Quelle den in einem Kafka-Thema veröffentlichten Datensätzen zugewiesen werden.

Kafka-Themen verfügen bis auf einen Partitionsschlüssel über keine voreingestellte Struktur.

Anmerkung

Sie müssen die Objektzuweisung nicht verwenden. Sie können die reguläre Tabellenzuweisung für verschiedene Transformationen verwenden. Der Partitionsschlüsseltyp folgt jedoch diesen Standardverhaltensweisen:

  • Der Primärschlüssel wird als Partitionsschlüssel für Volllast verwendet.

  • Wird als Partitionsschlüssel für verwendet, wenn keine Aufgabeneinstellungen zur parallelen Anwendung schema.table verwendet werden. CDC

  • Wenn Aufgabeneinstellungen mit parallelem Anwenden verwendet werden, wird der Primärschlüssel als Partitionsschlüssel für verwendet. CDC

Um eine Objektzuweisungsregel zu erstellen, legen Sie rule-type als object-mapping fest. Diese Regel gibt an, welchen Objektzuweisungstyp Sie verwenden möchten.

Die Struktur für die Regel lautet wie folgt.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS unterstützt derzeit map-record-to-record und map-record-to-document als einzig gültige Werte für den rule-action Parameter. Diese Einstellungen wirken sich auf Werte aus, die nicht in der exclude-columns-Attributliste ausgeschlossen sind. Die map-record-to-document Werte map-record-to-record und geben an, wie diese Datensätze standardmäßig AWS DMS behandelt werden. Diese Werte wirken sich in keiner Weise auf die Attributzuweisungen aus.

Verwenden Sie map-record-to-record beim Migrieren aus einer relationalen Datenbank zu einem Kafka-Thema. Dieser Regeltyp verwendet den taskResourceId.schemaName.tableName-Wert aus der relationalen Datenbank als Partitionsschlüssel in dem Kafka-Thema und erstellt ein Attribut für jede Spalte in der Quelldatenbank.

Beachten Sie bei Verwendung von map-record-to-record Folgendes:

  • Diese Einstellung wirkt sich nur auf Spalten aus, die durch die exclude-columns-Liste ausgeschlossen wurden.

  • AWS DMS Erstellt für jede dieser Spalten ein entsprechendes Attribut im Zielthema.

  • AWS DMS erstellt dieses entsprechende Attribut unabhängig davon, ob die Quellspalte in einer Attributzuordnung verwendet wird.

Eine Möglichkeit, map-record-to-record zu verstehen, besteht darin, sich die praktische Anwendung zu veranschaulichen. In diesem Beispiel wird davon ausgegangen, dass Sie mit einer Tabellenzeile einer relationalen Datenbank beginnen, die die folgende Struktur aufweist und die folgenden Daten enthält.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Um diese Informationen von einem Schema mit dem Namen Test zu einem Kafka-Thema zu migrieren, erstellen Sie Regeln, um die Daten dem Zielthema zuzuweisen. Die folgende Regel veranschaulicht die Zuweisung.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Nachfolgend wird das resultierende Datensatzformat bei Verwendung unserer Beispieldaten in dem Kafka-Zielthema für ein bestimmtes Kafka-Thema und einen bestimmten Partitionsschlüssel (in diesem Fall taskResourceId.schemaName.tableName) illustriert.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Umstrukturieren von Daten mit Attributzuweisung

Sie können die Daten umstrukturieren, während Sie sie mittels einer Attributzuweisung zu einem Kafka-Thema migrieren. So möchten Sie zum Beispiel vielleicht mehrere Felder in der Quelle in einem einzigen Feld im Ziel vereinen. Die folgenden Attributzuordnung veranschaulicht, wie die Daten umstrukturiert werden.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Um einen konstanten Wert für partition-key festzulegen, geben Sie einen partition-key-Wert an. So könnten Sie auf diese Weise beispielsweise erzwingen, dass alle Daten in einer einzigen Partition gespeichert werden. Die folgende Darstellung veranschaulicht dieses Konzept.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Anmerkung

Der partition-key-Wert für einen Steuerungsdatensatz, der für eine spezifische Tabelle bestimmt ist, lautet TaskId.SchemaName.TableName. Der partition-key-Wert für einen Steuerungsdatensatz, der für eine spezifische Aufgabe bestimmt ist, ist die TaskId des betreffenden Datensatzes. Wenn Sie einen partition-key-Wert in der Objektzuweisung angeben, hat dies keine Auswirkungen auf den partition-key für einen Steuerungsdatensatz.

Replikation für mehrere Themen mithilfe der Objektzuweisung

Standardmäßig migrieren AWS DMS Aufgaben alle Quelldaten zu einem der folgenden Kafka-Themen:

  • Wie im Feld Thema des AWS DMS Zielendpunkts angegeben.

  • Wie von kafka-default-topic angegeben, wenn das Feld Thema des Zielendpunkts nicht gefüllt ist und die Kafka-Einstellung auto.create.topics.enable auf true gesetzt ist.

Bei AWS DMS Engine-Versionen 3.4.6 und höher können Sie das kafka-target-topic Attribut verwenden, um jede migrierte Quelltabelle einem separaten Thema zuzuordnen. Mit den folgenden Objektzuweisungsregeln werden beispielsweise die Quelltabellen Customer und Address zu den Kafka-Themen customer_topic bzw. address_topic migriert. AWS DMS Migriert gleichzeitig alle anderen Quelltabellen, einschließlich der Bills Tabelle im Test Schema, zu dem im Zielendpunkt angegebenen Thema.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Mithilfe der Kafka-Replikation für mehrere Themen können Sie Quelltabellen mit einer einzigen Replikationsaufgabe gruppieren und zu separaten Kafka-Themen migrieren.

Nachrichtenformat für Apache Kafka

Die JSON Ausgabe ist einfach eine Liste von Schlüssel-Wert-Paaren.

RecordType

Der Datensatztyp kann entweder für Daten oder zur Steuerung bestimmt sein. Datensätze für Datenrepräsentieren die tatsächlichen Zeilen in der Quelle. Steuerungsdatensätze sind für wichtige Ereignisse im Stream bestimmt, z. B. einen Neustart der Aufgabe.

Operation

Mögliche Operationen für Datensätze sind load, insert, update oder delete.

Mögliche Operationen für Steuerungsdatensätze sind create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column oder column-type-change.

SchemaName

Das Quellschema für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.

TableName

Die Quelltabelle für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.

Zeitstempel

Der Zeitstempel für den Zeitpunkt, JSON an dem die Nachricht erstellt wurde. Das Feld ist mit dem ISO 8601-Format formatiert.

Das folgende JSON Nachrichtenbeispiel veranschaulicht einen Datentyp „Nachricht“ mit allen zusätzlichen Metadaten.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

Das folgende JSON Nachrichtenbeispiel veranschaulicht eine Meldung vom Typ Steuerelement.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }