本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Apache Kafka 做為 的目標 AWS Database Migration Service
您可以使用 AWS DMS 將資料遷移至 Apache Kafka 叢集。Apache Kafka 是一個分散式串流平台。您可以使用 Apache Kafka 來即時擷取和處理串流資料。
AWS 也提供 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 做為 AWS DMS 目標使用。Amazon MSK 是一種全受管 Apache Kafka 串流服務,可簡化 Apache Kafka 執行個體的實作和管理。它適用於開放原始碼 Apache Kafka 版本,而且您可以像任何 Apache Kafka MSK執行個體一樣,將 Amazon 執行個體做為 AWS DMS 目標存取。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的什麼是 AmazonMSK?。
Kafka 叢集會記錄串流儲存在稱為主題的類別裡,其中具有分割區。分割區是主題中資料記錄 (訊息) 的唯一識別序列。分割區可以分佈在叢集中的多個中介裝置,以啟用主題記錄的平行處理。如需有關主題和分割區及其在 Apache Kafka 中分佈的詳細資訊,請參閱主題和日誌
您的 Kafka 叢集可以是 Amazon MSK執行個體、在 Amazon EC2執行個體上執行的叢集,或內部部署叢集。Amazon MSK執行個體或 Amazon EC2執行個體上的叢集可以是相同VPC或不同的執行個體。如果您的叢集為內部部署,您可以針對複寫執行個體使用自己的內部部署名稱伺服器來解析叢集的主機名稱。如需為複寫執行個體設定名稱伺服器的相關資訊,請參閱 使用自己的內部部署名稱伺服器。如需設定網路的詳細資訊,請參閱 設定複寫執行個體的網路。
使用 Amazon MSK叢集時,請確定其安全群組允許從您的複寫執行個體存取 。如需變更 Amazon MSK叢集安全群組的詳細資訊,請參閱變更 Amazon MSK叢集的安全群組。
AWS Database Migration Service 使用 將記錄發佈至 Kafka 主題JSON。在轉換期間, AWS DMS 會將來源資料庫中的每個記錄序列化為JSON格式的屬性值對。
若要將資料從任何受支援的資料來源遷移到目標 Kafka 叢集,可以使用物件對應。您可以使用物件映射來決定如何建構目標主題中的資料記錄。您也可以定義每份資料表的分割區索引鍵,Apache Kafka 用它將資料分組為分割區。
目前, AWS DMS 支援每個任務的單一主題。對於具有多個資料表的單一任務,所有訊息都會傳送至同一主題。每個訊息都包含中繼資料區段,可識別目標結構描述和 table. AWS DMS versions 3.4.6 和更新版本,支援使用物件映射的多主題複寫。如需詳細資訊,請參閱使用物件映射的多主題複製。
Apache Kafka 端點設定
您可以透過 AWS DMS 主控台中的端點設定或 中的 --kafka-settings
選項來指定連線詳細資訊CLI。每個設定的需求如下:
-
Broker
— 以逗號分隔清單的形式指定 Kafka 叢集中一個或多個代理程式的位置。
例如,broker-hostname
:port
"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"
。此設定可以指定叢集中任意或所有代理程式的位置。叢集中介裝置全部都會進行通訊,以處理遷移到主題的資料記錄分割。 -
Topic
– (選擇性) 指定主題名稱,長度上限為 255 個字母和符號。您可以使用句號 (.)、底線 (_) 和減號 (-)。具有句點 (.) 或底線 (_) 的主題名稱可能會在內部資料結構中發生衝突。在主題名稱中使用其中一個符號,不要同時使用這兩個符號。如果您未指定主題名稱, AWS DMS 會使用"kafka-default-topic"
做為遷移主題。注意
若要 AWS DMS 建立您指定的遷移主題或預設主題,請將
auto.create.topics.enable = true
設定為 Kafka 叢集組態的一部分。如需詳細資訊,請參閱 使用 Apache Kafka 做為 目標的限制 AWS Database Migration Service MessageFormat
– 在端點建立之記錄的輸出格式。訊息格式為JSON
(預設) 或JSON_UNFORMATTED
(不含製表符的單行)。-
MessageMaxBytes
– 在端點上建立之記錄的大小上限 (以位元組為單位)。預設值為 1,000,000。注意
您只能使用 AWS CLI/SDK
MessageMaxBytes
來變更為非預設值。例如,若要修改現有的 Kafka 端點並變更MessageMaxBytes
,請使用以下命令。aws dms modify-endpoint --endpoint-arn
your-endpoint
--kafka-settings Broker="broker1-server
:broker1-port
,broker2-server
:broker2-port
,...", Topic=topic-name
,MessageMaxBytes=integer-of-max-message-size-in-bytes
IncludeTransactionDetails
– 提供來源資料庫的詳細交易資訊。此資訊包括遞交時間戳記、記錄位置,以及transaction_id
、previous_transaction_id
,和transaction_record_id
(交易內的記錄位移) 的值。預設值為false
。IncludePartitionValue
– 在 Kafka 訊息輸出中顯示分割區值,除非分割區類型為schema-table-type
。預設值為false
。PartitionIncludeSchemaTable
– 當磁碟分割類型為primary-key-type
時,將結構描述和資料表名稱作為磁碟分割值的前綴。這樣做會增加 Kafka 分割區之間的資料分佈。例如,假設SysBench
結構描述有數千個資料表,而每個資料表的主索引鍵只有有限的範圍。在這種情況下,相同的主金鑰會從數千個資料表發送到相同的分割區,而這將導致調節。預設值為false
。IncludeTableAlterOperations
– 包含變更控制資料中資料表的任何資料定義語言 (DDL) 操作,例如rename-table
、drop-table
、add-column
、drop-column
和rename-column
。預設值為false
。IncludeControlDetails
– 在 Kafka 訊息輸出中顯示資料表定義、欄位定義以及資料表和欄位變更的詳細控制資訊。預設值為false
。-
IncludeNullAndEmpty
– 在目標中包含 NULL和空白資料欄。預設值為false
。 -
SecurityProtocol
– 使用 Transport Layer Security () 設定 Kafka 目標端點的安全連線TLS。選項包括ssl-authentication
、ssl-encryption
和sasl-ssl
。使用sasl-ssl
將需要SaslUsername
和SaslPassword
。 -
SslEndpointIdentificationAlgorithm
– 設定憑證的主機名稱驗證。3.5.1 版和更新版本支援 AWS DMS 此設定。選項包括下列項目:NONE
:在用戶端連線中停用代理程式的主機名稱驗證。HTTPS
:在用戶端連線中啟用代理程式的主機名稱驗證。
-
useLargeIntegerValue
– 使用最多 18 位數整數,而不是將整數轉換為雙組,可從 3.5.4 AWS DMS 版取得。預設值為 false。
您可以使用設定來協助提高傳輸速度。由於要執行此操作, AWS DMS 要支援對一個 Apache Kafka 目標叢集執行多執行緒完全載入。 AWS DMS 透過包含下列項目的任務設定以支援此多執行緒:
-
MaxFullLoadSubTasks
– 使用此選項來指出要平行載入的來源資料表數量上限。 會使用專用子任務,將每個資料表 AWS DMS 載入其對應的 Kafka 目標資料表。預設值為 8;最大值為 49。 -
ParallelLoadThreads
– 使用此選項指定 AWS DMS 用於將每個資料表載入其 Kafka 目標資料表的執行緒數目。Apache Kafka 目標的最大值為 32。您可以要求提高此上限。 -
ParallelLoadBufferSize
– 使用此選項指定平行載入執行緒將資料載入至 Kafka 目標時,緩衝區中存放的記錄數量上限。預設值為 50。最大值為 1000。使用此設定搭配ParallelLoadThreads
;ParallelLoadBufferSize
,只有在有多個執行緒時才有效。 -
ParallelLoadQueuesPerThread
:使用此選項指定每個並行執行緒存取的佇列數目,以便將資料記錄從佇列中取出,並為目標產生批次載入。預設為 1。最多 512 個。
您可以調校平行執行緒和大量操作的任務設定,以改善 Kafka 端點的變更資料擷取 (CDC) 的效能。若要執行此操作,您可以指定並行執行緒數目、每個執行緒的佇列數,以及使用 ParallelApply*
任務設定儲存在緩衝區中的記錄數目。例如,假設您想要執行CDC載入並平行套用 128 個執行緒。您也想要每個執行緒存取 64 個佇列,且每個緩衝區儲存 50 筆記錄。
為了提升CDC效能, AWS DMS 支援下列任務設定:
-
ParallelApplyThreads
– 指定在CDC載入期間 AWS DMS 用來將資料記錄推送至 Kafka 目標端點的並行執行緒數目。預設值為零 (0),最大值為 32。 -
ParallelApplyBufferSize
– 指定在每個緩衝區佇列中存放的記錄數目上限,以便並行執行緒在CDC載入期間推送至 Kafka 目標端點。預設值為 100,最大值為 1,000。ParallelApplyThreads
指定多個執行緒時,請使用此選項。 -
ParallelApplyQueuesPerThread
– 指定每個執行緒存取的佇列數量,以將資料記錄從佇列中移出,並在 期間為 Kafka 端點產生批次載入CDC。預設為 1。最多 512 個。
在使用 ParallelApply*
任務設定時,partition-key-type
預設是資料表的 primary-key
,而非 schema-name.table-name
。
使用 Transport Layer Security 連線至 Kafka (TLS)
Kafka 叢集接受使用 Transport Layer Security () 的安全連線TLS。使用 DMS時,您可以使用下列三個安全通訊協定選項中的任何一個來保護 Kafka 端點連線。
- SSL 加密 (
server-encryption
) -
用戶端會透過伺服器的憑證來驗證伺服器身分。接著在伺服器和用戶端之間建立加密連線。
- SSL 身分驗證 (
mutual-authentication
) -
伺服器和用戶端透過自己的憑證互相驗證身分。接著在伺服器和用戶端之間建立加密連線。
- SASL-SSL (
mutual-authentication
) -
Simple Authentication and Security Layer (SASL) 方法以使用者名稱和密碼取代用戶端的憑證,以驗證用戶端身分。具體來說,您需要提供伺服器已註冊的使用者名稱和密碼,讓伺服器驗證用戶端的身分。接著在伺服器和用戶端之間建立加密連線。
重要
Apache Kafka 和 Amazon MSK接受已解析的憑證。這是MSK要解決的 Kafka 和 Amazon 的已知限制。如需詳細資訊,請參閱 Apache Kafka 問題 KAFKA-3700
如果您使用的是 Amazon MSK,請考慮使用存取控制清單 (ACLs) 做為此已知限制的解決方法。如需使用 的詳細資訊ACLs,請參閱 Amazon Managed Streaming for Apache Kafka ACLs 開發人員指南中的 Apache Kafka 一節。
如果您使用的是自我管理的 Kafka 叢集,請參閱 2018 年 10 月 21 日的註解
搭配 Amazon MSK或自我管理的 Kafka 叢集使用SSL加密
您可以使用SSL加密來保護 Amazon MSK或自我管理 Kafka 叢集的端點連線。當您使用SSL加密身分驗證方法時,用戶端會透過伺服器的憑證驗證伺服器的身分。接著在伺服器和用戶端之間建立加密連線。
使用SSL加密連線至 Amazon MSK
-
建立目標 Kafka 端點時,請使用
ssl-encryption
選項設定安全通訊協定端點設定 (SecurityProtocol
)。以下JSON範例會將安全通訊協定設定為SSL加密。
"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
對自我管理的 Kafka 叢集使用SSL加密
-
如果您在內部部署 Kafka 叢集中使用私有憑證授權機構 (CA),請上傳私有 CA 憑證並取得 Amazon Resource Name (ARN)。
-
建立目標 Kafka 端點時,請使用
ssl-encryption
選項設定安全通訊協定端點設定 (SecurityProtocol
)。以下JSON範例會將安全通訊協定設定為ssl-encryption
。"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
-
如果您使用的是私有 CA,ARN請在上述第一步取得的
SslCaCertificateArn
中設定 。
使用SSL身分驗證
您可以使用SSL身分驗證來保護 Amazon MSK或自我管理 Kafka 叢集的端點連線。
若要使用身分驗證啟用用戶端SSL身分驗證和加密以連線至 Amazon MSK,請執行下列動作:
-
準備 Kafka 的私有金鑰和公有憑證。
-
將憑證上傳至DMS憑證管理員。
-
使用 Kafka 端點設定中ARNs指定的對應憑證建立 Kafka 目標端點。
為 Amazon 準備私有金鑰和公有憑證 MSK
-
建立EC2執行個體並設定用戶端以使用身分驗證,如 Amazon Managed Streaming for Apache Kafka 開發人員指南中的用戶端身分驗證一節中所述。
完成這些步驟後,您會擁有憑證 ARN(ARN儲存在 中的公有憑證ACM),以及
kafka.client.keystore.jks
檔案內包含的私有金鑰。 -
使用下列命令取得公用憑證,並將憑證複製到
signed-certificate-from-acm.pem
檔案中:aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
該命令會傳回類似於以下範例的資訊。
{"Certificate": "123", "CertificateChain": "456"}
然後,請將與
"123"
相等的項目複製到signed-certificate-from-acm.pem
文件中。 -
從
kafka.client.keystore.jks to keystore.p12
中匯入msk-rsa
金鑰以取得私有金鑰,如下列範例所示。keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
-
使用以下命令將
.pem
匯出為keystore.p12
格式。Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
出現 Enter PEM密碼片語訊息,並識別用於加密憑證的金鑰。
-
從
.pem
檔案中移除 bag 屬性和 key 屬性,以確定第一行的開頭字串如下。---BEGIN ENCRYPTED PRIVATE KEY---
將公有憑證和私有金鑰上傳到DMS憑證管理員,並測試與 Amazon 的連線 MSK
-
使用下列命令上傳至DMS憑證管理員。
aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://
path to signed cert
aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
-
建立 Amazon MSK目標端點並測試連線,以確保TLS身分驗證有效。
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
重要
您可以使用SSL身分驗證來保護與自我管理 Kafka 叢集的連線。在某些情況下,您可能會在內部部署 Kafka 叢集中使用私有憑證授權機構 (CA)。如果是,請將您的 CA 鏈、公有憑證和私有金鑰上傳到DMS憑證管理員。然後,當您建立內部部署 Kafka 目標端點時,請在端點設定中使用對應的 Amazon Resource Name (ARN)。
若要準備自我管理 Kafka 叢集的私有金鑰和簽署憑證
-
產生金鑰對,如下列範例所示。
keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass
your-keystore-password
-keypassyour-key-passphrase
-dname "CN=your-cn-name
" -aliasalias-of-key-pair
-storetype pkcs12 -keyalg RSA -
產生憑證簽署請求 (CSR)。
keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass
your-key-store-password
-keypassyour-key-password
-
使用叢集信任存放區中的 CA 來簽署 CSR。如果您沒有 CA,則可以建立自己的私有 CA。
openssl req -new -x509 -keyout ca-key -out ca-cert -days
validate-days
-
將
ca-cert
匯入至伺服器信任存放區和金鑰存放區。如果您沒有信任存放區,請使用下列命令建立信任存放區,並將ca-cert
匯入其中。keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
-
簽署憑證。
openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days
validate-days
-CAcreateserial -passin pass:ca-password
-
將簽署的憑證匯入金鑰存放區。
keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass
your-keystore-password
-keypassyour-key-password
-
使用以下命令將
on-premise-rsa
金鑰從kafka.server.keystore.jks
匯入至keystore.p12
。keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass
your-truststore-password
\ -destkeypassyour-key-password
-
使用以下命令將
.pem
匯出為keystore.p12
格式。Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
-
ca-cert
將encrypted-private-server-key.pem
、signed-certificate.pem
和 上傳至DMS憑證管理員。 -
使用傳回的 建立端點ARNs。
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "
your-client-cert-arn
","SslClientKeyArn": "your-client-key-arn
","SslClientKeyPassword":"your-client-key-password
", "SslCaCertificateArn": "your-ca-certificate-arn
"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
使用 SASL-SSL 身分驗證連線到 Amazon MSK
Simple Authentication and Security Layer (SASL) 方法使用使用者名稱和密碼來驗證用戶端身分,並在伺服器和用戶端之間進行加密連線。
若要使用 SASL,請先在設定 Amazon MSK叢集時建立安全的使用者名稱和密碼。如需如何為 Amazon MSK叢集設定安全使用者名稱和密碼的說明,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的設定SASL/SCRAM驗證 Amazon MSK叢集。
接著,當您建立 Kafka 目標端點時,請使用 sasl-ssl
選項設定安全通訊協定端點設定 (SecurityProtocol
)。您還可以設定 SaslUsername
和 SaslPassword
選項。請確定這些項目與您第一次設定 Amazon MSK叢集時建立的安全使用者名稱和密碼一致,如下列JSON範例所示。
"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"
Amazon MSK cluster secure user name
", "SaslPassword":"Amazon MSK cluster secure password
" }
注意
-
目前, 僅 AWS DMS 支援由私有 CA 支援的公有 CASASLSSL。 DMS 不支援 SASL-SSL 與由私有 CA 支援的自我管理 Kafka 搭配使用。
-
對於 SASL-SSL 身分驗證, 預設 AWS DMS 支援 SCRAM-SHA-512 機制。 3.5.0 版和更新 AWS DMS 版本也支援純機制。若要支援純機制,請將
KafkaSettings
API資料類型的SaslMechanism
參數設定為PLAIN
。
使用映像前的 來檢視 Apache Kafka 做為目標的資料CDC列原始值
將CDC更新寫入 Kafka 等資料串流目標時,您可以先檢視來源資料庫資料列的原始值,再進行更新變更。為了實現此目標, 會根據來源資料庫引擎提供的資料 AWS DMS ,填入更新事件之前的影像。
不同的來源資料庫引擎可提供不同的前映像資訊量:
-
Oracle 僅提供欄更新 (如果有變更的話)。
-
PostgreSQL 只會為屬於主索引鍵 (無論是否變更) 的資料欄提供資料。如果邏輯複寫正在使用中REPLICAIDENTITYFULL,且已針對來源資料表設定 ,則您可以在寫入 並在此處WALs提供的列上取得完整前後資訊。
-
我的SQL 通常會提供所有資料欄的資料 (無論是否變更)。
若要啟用前映像功能以從來源資料庫將原始值新增至 AWS DMS 輸出,請使用 BeforeImageSettings
任務設定或 add-before-image-columns
參數。此參數會套用欄轉換規則。
BeforeImageSettings
會將新的JSON屬性新增至每個更新操作,其中包含從來源資料庫系統收集的值,如下所示。
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意
BeforeImageSettings
套用至完全載入加上CDC任務 (遷移現有資料並複寫持續變更),或CDC僅適用於任務 (僅複寫資料變更)。請勿將 BeforeImageSettings
套用至僅限完全載入的任務。
針對 BeforeImageSettings
選項,適用的設定如下:
-
將
EnableBeforeImage
選項設為true
以啟用前映像功能。預設值為false
。 -
使用
FieldName
選項將名稱指派給新JSON屬性。若EnableBeforeImage
為true
,FieldName
則為必填,且不能留白。 -
ColumnFilter
選項會使用前映像來指定要新增的欄。若只要新增屬於資料表主索引鍵一部分的欄,請使用預設值pk-only
。若要僅新增非 LOB類型的資料欄,請使用non-lob
。若要新增任何具有前映像值的欄,請使用all
。"BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
使用前映像轉換規則
您可使用 add-before-image-columns
參數做為任務設定的替代方式,它會套用欄轉換規則。使用此參數,您可以在 Kafka 等資料串流目標CDC上於 進行映像之前啟用 。
只要在轉換規則中使用 add-before-image-columns
,即可套用前映像結果的更精細的控制。轉換規則可讓您使用物件定位器,以便掌控針對規則選取的資料表。此外,您可以將轉換規則鏈結在一起,讓不同的規則套用至不同的資料表。接著,您可以使用其他規則來操控產生的欄。
注意
請勿在同一個任務內將 add-before-image-columns
參數與 BeforeImageSettings
任務設定一起搭配使用。請改為將參數或設定 (擇一使用,而非兩者同時使用) 用於單一任務。
具有欄的 add-before-image-columns
參數的 transformation
規則類型必須提供 before-image-def
區段。下列顯示一個範例。
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
column-prefix
的值會附加至欄名稱前,而column-prefix
的預設值為 BI_
。column-suffix
的值會附加至欄名稱,而預設值為空白。請勿將 column-prefix
和 column-suffix
同時設為空白字串。
為 column-filter
選擇一個值。若只要新增屬於資料表主索引鍵一部分的欄,請選擇 pk-only
。選擇non-lob
僅新增非 LOB類型的資料欄。或者,選擇 all
以新增任何具有前映像值的欄。
前映像轉換規則範例
下列範例中的轉換規則會在目標中新增名為 BI_emp_no
的欄。因此,UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
等陳述式會以 1 填入 BI_emp_no
欄位。當您將CDC更新寫入 Amazon S3 目標時,資料BI_emp_no
欄可讓您知道哪個原始資料列已更新。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
如需使用 add-before-image-columns
規則動作的相關資訊,請參閱 轉換規則與動作。
使用 Apache Kafka 做為 目標的限制 AWS Database Migration Service
使用 Apache Kafka 做為目標時,有下列限制:
-
AWS DMS Kafka 目標端點不支援 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 的IAM存取控制。
-
不支援完整LOB模式。
-
為您的叢集指定 Kafka 組態檔案,其中包含允許 AWS DMS 自動建立新主題的屬性。包括設定,
auto.create.topics.enable = true
。如果您使用的是 Amazon MSK,您可以在建立 Kafka 叢集時指定預設組態,然後將auto.create.topics.enable
設定變更為true
。如需預設組態設定的詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的預設 Amazon MSK組態。 如果您需要修改使用 Amazon 建立的現有 Kafka 叢集MSK,請執行 AWS CLI 命令aws kafka create-configuration
來更新您的 Kafka 組態,如下列範例所示:14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }
在此,
//~/kafka_configuration
是您以必要屬性設定所建立的組態檔案。如果您使用的是安裝在 Amazon 上的自己的 Kafka 執行個體EC2,請使用執行個體隨附的選項,使用
auto.create.topics.enable = true
設定修改 Kafka 叢集組態, AWS DMS 以允許 自動建立新主題。 -
AWS DMS 會將每個更新發佈至來源資料庫中的單一記錄,做為指定 Kafka 主題中的一個資料記錄 (訊息),無論交易為何。
-
AWS DMS 支援下列兩種形式的分割區索引鍵:
-
SchemaName.TableName
:結構描述和資料表名稱的組合。 -
${AttributeName}
: 中其中一個欄位的值JSON,或來源資料庫中資料表的主要索引鍵。
-
-
Kafka 端點不支援
BatchApply
。對 Kafka 目標使用批次套用 (例如BatchApplyEnabled
目標中繼資料任務設定) 可能會導致資料遺失。 -
AWS DMS 不支援遷移超過 16 位數的
BigInt
資料類型值。若要解決此限制,您可以使用下列轉換規則將BigInt
資料欄轉換為字串。如需轉換規則的詳細資訊,請參閱 轉換規則與動作。{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
使用物件映射將資料遷移到 Kafka 主題
AWS DMS 使用資料表映射規則,將資料從來源映射到目標 Kafka 主題。若要將資料映射到目標主題,請使用一種稱為物件映射的資料表映射規則。您可以使用物件映射定義如何將來源中的資料記錄映射到發佈到 Kafka 主題的資料記錄。
除了擁有分割區索引鍵外,Kafka 主題沒有預設結構。
注意
您不一定要使用物件映射。您可以針對各種轉換使用一般資料表映射。不過,分割區索引鍵類型會遵循下列預設行為:
-
主索引鍵會作為「完全載入」的分割區索引鍵。
-
如果未使用平行套用任務設定,
schema.table
會做為 的分割區索引鍵CDC。 -
如果使用平行套用任務設定,則會使用主金鑰做為 的分割區金鑰CDC。
若要建立物件映射規則,請將 rule-type
指定為 object-mapping
。此規則指定您想要使用的物件映射類型。
規則的結構如下。
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS 目前支援 map-record-to-record
和 map-record-to-document
做為 rule-action
參數的唯一有效值。這些設定會影響 exclude-columns
屬性清單中未排除的值。map-record-to-record
和 map-record-to-document
值會指定預設 AWS DMS 如何處理這些記錄。這些值反正不會影響屬性映射。
從關聯式資料庫遷移到 Kafka 主題時,請使用 map-record-to-record
。此規則類型使用關聯式資料庫的 taskResourceId.schemaName.tableName
值做為 Kafka 主題的分割區索引鍵,並會為來源資料庫中的每一欄建立一個屬性。
使用 map-record-to-record
時,請注意下列事項:
此設定只會影響
exclude-columns
清單中排除的欄。對於每個這類資料欄, 會在目標主題中 AWS DMS 建立對應的屬性。
AWS DMS 無論屬性映射中是否使用來源資料欄, 都會建立此對應的屬性。
了解 map-record-to-record
的一種方法是查看它運作時的狀態。在本範例中,假設您開始使用之關聯式資料庫資料表資料列的結構和資料如下。
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
1988/02/29 |
若要將此資訊從名為 Test
的結構描述遷移至 Kafka 主題,您可以建立規則以將資料映射至目標主題。以下規則說明映射。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
指定 Kafka 主題和分割區索引鍵 (在本例中為 taskResourceId.schemaName.tableName
),下列說明使用我們在 Kafka 目標主題中的範例資料所產生的記錄格式:
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
使用屬性映射重組資料
您可以在使用屬性映射將資料遷移到 Kafka 主題的同時重組資料。例如,您可能想要將來源的幾個欄位合併為目標的單一欄位。以下屬性映射說明如何重組資料。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
若要設定 partition-key
的常數值,請指定一個 partition-key
值。例如,您可以執行此操作,將所有資料強制儲存在單一分割區中。以下映射說明此方法。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意
用於特定資料表的控制記錄 partition-key
值是 TaskId.SchemaName.TableName
。用於特定任務的控制記錄 partition-key
值是該記錄的 TaskId
。在物件映射中指定 partition-key
值對控制記錄的 partition-key
沒有影響。
使用物件映射的多主題複製
根據預設, AWS DMS 任務會將所有來源資料遷移至下列其中一個 Kafka 主題:
如 AWS DMS 目標端點的主題欄位中指定。
如果目標端點的主題欄位未填寫,且 Kafka
auto.create.topics.enable
設定為true
,則為kafka-default-topic
指定的主題。
使用 AWS DMS 引擎 3.4.6 版和更新版本時,您可以使用 kafka-target-topic
屬性,將每個遷移的來源資料表對應至個別主題。例如,物件映射規則隨後會將來源資料表 Customer
和 Address
分別遷移至 Kafka 主題 customer_topic
及 address_topic
。同時, 會將所有其他來源資料表,包括Test
結構描述中的Bills
資料表, AWS DMS 遷移至目標端點中指定的主題。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }
透過使用 Kafka 多主題複寫,您可以使用單一複寫任務將來源資料表來分組和遷移至個別 Kafka 主題。
Apache Kafka 訊息格式
JSON 輸出只是鍵/值對的清單。
- RecordType
-
記錄類型可以是資料或控制。資料記錄代表來源的實際資料列。控制記錄用於串流的重要事件,例如重新啟動任務。
- 作業
-
針對資料記錄,操作可以是
load
、insert
、update
或delete
。針對控制記錄,操作可以是
create-table
、rename-table
、drop-table
、change-columns
、add-column
、drop-column
、rename-column
或column-type-change
。 - SchemaName
-
記錄的來源結構描述。控制記錄的此欄位可以為空。
- TableName
-
記錄的來源資料表。控制記錄的此欄位可以為空。
- 時間戳記
-
訊息JSON建構時間的時間戳記。欄位的格式為 ISO8601 格式。
下列JSON訊息範例說明具有所有其他中繼資料的資料類型訊息。
{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }
下列JSON訊息範例說明控制類型訊息。
{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }