閱讀英文

共用方式為


使用 MirrorMaker 2 在不同的 Azure HDInsight 版本之間移轉 Kafka 叢集

了解如何使用 Apache Kafka 的鏡像功能,將主題複寫至次要叢集。 您可以執行鏡像作為連續程序,或間歇性執行,以將資料從一個叢集移轉至另一個叢集。

在本文中,您會使用鏡像在兩個 HDInsight 叢集之間複寫主題。 這些叢集位於不同資料中心的不同虛擬網路中。

注意

  1. 我們可以使用鏡像叢集作為容錯。
  2. 這只適用於主要叢集 HDI Kafka 2.4.1、3.2.0,而次要叢集是 HDI Kafka 3.2.0 版。
  3. 如果您的主要叢集關閉,次要叢集會無縫地運作。
  4. 取用者群組位移會自動轉換為次要叢集。
  5. 只需將主要叢集取用者指向具有相同取用者群組的次要叢集,取用者群組就會從其在主要叢集中留下的位移開始取用。
  6. 唯一的差異在於備份叢集中的主題名稱將從 TOPIC_NAME 變更為 primary-cluster-name.TOPIC_NAME。

Apache Kafka 鏡像的運作方式

鏡像的運作方式是使用 MirrorMaker2 工具,這是 Apache Kafka 的一部分。 MirrorMaker 會從主要叢集上的主題取用記錄,然後在次要叢集上建立本機複本。 MirrorMaker2 會使用一個或多個從主要叢集讀取的取用者,以及一個寫入至本機 (次要) 叢集的生產者

災害復原的最有用鏡像設定會利用不同 Azure 區域中的 Kafka 叢集。 為了實現此結果,叢集所在的虛擬網路會一起對等互連。

主要與次要叢集的節點與磁碟分割數目可能有所不同,且主題內的位移也會不同。 鏡像會維護用於資料分割的金鑰值,因此會根據每個金鑰保留記錄順序。

跨網路界限鏡像

如果您需要在不同網路中的 Kafka 叢集之間進行鏡像處理,有下列其他考量︰

  • 閘道:網路必須能夠在 TCP/IP 層級進行通訊。

  • 伺服器定址:您可以選擇使用 IP 位址或完整網域名稱來定址叢集節點。

    • IP 位址:如果您將 Kafka 叢集設定為使用 IP 位址公告,則可以使用訊息代理程式節點和 Zookeeper 節點的 IP 位址,繼續進行鏡像設定。

    • 網域名稱:如果您未設定 Kafka 叢集進行 IP 位址公告,叢集必須能夠使用完整網域名稱 (FQDN) 彼此連接。 這需要每個網路中的網域名稱系統 (DNS) 伺服器設定為將要求轉送到其他網路。 建立 Azure 虛擬網路時,您必須指定自訂 DNS 伺服器和伺服器的 IP 位址,而不是使用隨著網路提供的自動 DNS。 建立虛擬網路之後,您必須接著建立使用該 IP 位址的 Azure 虛擬機器。 然後您會在其上安裝和設定 DNS 軟體。

    重要

    先建立和設定自訂 DNS 伺服器,然後再將 HDInsight 安裝到虛擬網路中。 HDInsight 不需要進行其他設定,即可使用針對虛擬網路設定的 DNS 伺服器。

如需有關如何連接兩個 Azure 虛擬網路的詳細資訊,請參閱設定連線

鏡像結構

此結構會在不同的資源群組和虛擬網路中提供兩個叢集:主要和次要。

建立步驟

  1. 建立兩個新的資源群組:

    資源群組 Location
    kafka-primary-rg 美國中部
    kafka-secondary-rg 美國中北部
  2. kafka-primary-rg 中建立新的虛擬網路 kafka-primary-vnet。 保留預設設定。

  3. kafka-secondary-rg 中建立新的虛擬網路 kafka-secondary-vnet,也具有預設設定。

    注意

    保持這兩個 vnet 的位址不重疊,否則 vnet 對等互連將無法運作。 範例:

    1. kafka-primary-vnet 可以具有位址空間 10.0.0.0
    2. kafka-secondary-vnet 可以具有位址空間 10.1.0.0
  4. 建立虛擬網路對等互連。 此步驟會建立兩個對等互連:一個從 kafka-primary-vnetkafka-secondary-vnet,另一個從 kafka-secondary-vnet 回到 kafka-primary-vnet

    1. 選取 kafka-primary-vnet 虛擬網路。

    2. 在 [設定] 底下,選取 [對等互連]

    3. 選取 [新增]。

    4. 在 [新增對等互連] 畫面上輸入詳細資料,如下列螢幕擷取畫面所示。

      Screenshot that shows HDInsight Kafka add virtual network peering primary to secondary.Screenshot that shows HDInsight Kafka add virtual network peering from secondary to primary.

  5. 建立兩個新的 Kafka 叢集:

    叢集名稱 HDInsight 版本 資源群組 虛擬網路 儲存體帳戶
    primary-kafka-cluster 5.0 kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    secondary-kafka-cluster 5.1 kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage

    注意

    從現在起,我們將使用 primary-kafka-cluster 作為 PRIMARYCLUSTER,以及使用 secondary-kafka-cluster 作為 SECONDARYCLUSTER

將 PRIMARYCLUSTER 背景工作節點的 IP 位址設定為用戶端電腦以進行 DNS 解析

  1. 使用 SECONDARYCLUSTER 的前端節點來執行鏡像製作者指令碼。 然後在 SECONDARYCLUSTER/etc/hosts 檔案中,我們需要 PRIMARYCLUSTER 背景工作節點的 IP 位址。

  2. 連線到 PRIMARYCLUSTER

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    
  3. 執行下列命令,並取得背景工作節點 IP 和 FQDN cat /etc/hosts 的項目

  4. 複製這些專案並連線到 SECONDARYCLUSTER,然後執行

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net` 
    
  5. 編輯次要叢集的 /etc/hosts 檔案,並在這裡新增這些項目。

  6. 在您進行變更之後,SECONDARYCLUSTER/etc/hosts 檔案看起來就像指定的影像。

    Screenshot that shows etc hosts file output.

  7. 儲存並關閉檔案。

在 PRIMARYCLUSTER 中建立多個主題

  1. 使用此命令來建立主題並取代變數。

    bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --create --topic $TOPICNAME --partitions $NUM_PARTITIONS --replication-factor $REPLICATION_FACTOR 
    

在 SECONDARYCLUSTER 中設定 MirrorMaker2

  1. 現在,變更 MirrorMaker2 屬性檔中的設定。

  2. 以管理員權限執行下列命令

    sudo su 
    vi /etc/kafka/conf/connect-mirror-maker.properties 
    

    注意

    本文會參考 Microsoft 不再使用的術語。 從軟體中移除該字詞時,我們也會將其從本文中移除。

  3. 屬性檔看起來像這樣。

    # specify any number of cluster aliases
    clusters = source, destination
    
    # connection information for each cluster
    # This is a comma separated host:port pairs for each cluster
    # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts source.bootstrap.servers = wn0-src kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092 destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
    # enable and configure individual replication flows
    source->destination.enabled = true
    
    # regex which defines which topics gets replicated. For eg "foo-.*"
    source->destination.topics = .*
    groups=.*
    topics.blacklist="*.internal,__.*"
    
    # Setting replication factor of newly created remote topics
    Replication.factor=3
    
    checkpoints.topic.replication.factor=1
    heartbeats.topic.replication.factor=1
    offset-syncs.topic.replication.factor=1
    
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    config.storage.replication.factor=1
    
  4. 在這裡,來源是您的 PRIMARYCLUSTER,而目的地是您的 SECONDARYCLUSTR。 將其取代為正確的名稱,並將 source.bootstrap.serversdestination.bootstrap.servers 取代為其各自背景工作節點的正確 FQDN 或 IP。

  5. 您可以使用規則運算式來指定您想要複寫的主題及其設定。 藉由將 replication.factor 參數設定為 3,您可以確定 MirrorMaker 指令碼所建立的所有主題都有複寫因數 3。

  6. 針對這些主題,將複寫因數從 1 增加到 3

    checkpoints.topic.replication.factor=1
    heartbeats.topic.replication.factor=1
    offset-syncs.topic.replication.factor=1
    
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    config.storage.replication.factor=1
    

    注意

    在訊息代理程式層級成為所有主題預設 insync 複本的原因為 2。 保留複寫因數=1,在執行 mirrormaker2 時會擲回例外狀況

  7. 您必須啟用自動主題建立功能,然後鏡像製作者指令碼會在次要叢集中複寫名稱為 PRIMARYCLUSTER.TOPICNAME 且具有相同設定的主題。 儲存檔案,我們順利完成了設定。

  8. 如果想要在兩端鏡像主題,例如 Primary to SecondarySecondary to Primary (主動-主動),您可以新增額外的設定

    destination->source.enabled=true
    destination->source.topics = .*
    
  9. 針對自動化的取用者位移同步,我們也必須啟用複寫並控制同步持續時間。 下列屬性會每隔 30 秒同步位移一次。 針對主動-主動案例,我們必須同時進行。

    groups=.* 
    
    emit.checkpoints.enabled = true 
    source->destination.sync.group.offsets.enabled = true 
    source->destination.sync.group.offsets.interval.ms=30000 
    
    destination->source.sync.group.offsets.enabled = true       
    destination->source.sync.group.offsets.interval.ms=30000
    
  10. 如果我們不想要跨叢集複寫內部主題,請使用下列屬性

    topics.blacklist="*.internal,__.*"    
    
  11. 變更後的最終設定檔應該看起來就像這樣

    # specify any number of cluster aliases
    clusters = primary-kafka-cluster, secondary-kafka-cluster
    
    # connection information for each cluster
    # This is a comma separated host:port pairs for each cluster
    # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari -> Hosts 
    primary-kafka-cluster.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092 
    secondary-kafka-cluster.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
    
    # enable and configure individual replication flows
    primary-kafka-cluster->secondary-kafka-cluster.enabled = true
    
    # enable this for both side replication
    secondary-kafka-cluster->primary-kafka-cluster.enabled = true
    
    # regex which defines which topics gets replicated. For eg "foo-.*"
    primary-kafka-cluster->secondary-kafka-cluster.topics = .*
    secondary-kafka-cluster->primary-kafka-cluster.topics = .*
    
    groups=.*
    emit.checkpoints.enabled = true 
    primary-kafka-cluster->secondary-kafka-cluster.sync.group.offsets.enabled=true 
    primary-kafka-cluster->secondary-kafka-cluster.sync.group.offsets.interval.ms=30000 
    secondary-kafka-cluster->primary-kafka-cluster.sync.group.offsets.enabled   = true 
    secondary-kafka-cluster->primary-kafka-cluster.sync.group.offsets.interval.ms=30000
    topics.blacklist="*.internal,__.*"
    
    # Setting replication factor of newly created remote topics
    Replication.factor=3
    
    checkpoints.topic.replication.factor=3
    heartbeats.topic.replication.factor=3
    offset-syncs.topic.replication.factor=3
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
    config.storage.replication.factor=3
    
  12. SECONDARYCLUSTER 中啟動 Mirror Maker2,其應該正常執行

    /usr/hdp/current/kafka-broker 
    ./bin/connect-mirror-maker.sh ./config/connect-mirror-maker.properties 
    
  13. 現在於 PRIMARYCLUSTER 中啟動生產者

    export clusterName='primary-kafka-cluster' 
    export TOPICNAME='TestMirrorMakerTopic' 
    export KAFKABROKERS='wn0-primar:9092' 
    export KAFKAZKHOSTS='zk0-primar:2181'
    
    //Start Producer
    
    # For Kafka 2.4 
    bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --zookeeper $KAFKAZKHOSTS --topic $TOPICNAME 
    # For Kafka 3.2 
    bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --boostrap-server $KAFKABROKERS --topic $TOPICNAME 
    
  14. 現在使用取用者群組啟動 PRIMARYCLUSTER 中的取用者

    //Start Consumer
    
    # For Kafka 2.4 
    bash /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --topic $TOPICNAME -–group my-group –-from- beginning
    
    # For Kafka 3.2 
    bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --boostrap-server $KAFKABROKERS --topic $TOPICNAME -–group my-group –-from-beginning  
    
  15. 現在停止 PRIMARYCONSUMER 中的取用者,並使用相同的取用者群組啟動 SECONDARYCLUSTER 中的取用者

    export clusterName='secondary-kafka-cluster'  
    
    export TOPICNAME='primary-kafka-cluster.TestMirrorMakerTopic'   
    
    export KAFKABROKERS='wn0-second:9092'  
    
    export KAFKAZKHOSTS='zk0-second:2181'  
    
    # List all the topics whether they're replicated or not 
    bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --list 
    
    # Start Consumer
    bash /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic $TOPICNAME --from-beginning 
    

    您可能注意到,在次要叢集取用者群組中,my-group 無法取用任何訊息,因為主要叢集取用者群組已取用。 現在會在主要叢集中產生更多訊息,然後嘗試在次要叢集中取用。 您能夠從 SECONDARYCLUSTER 取用。

刪除叢集

警告

不論使用與否,HDInsight 叢集都是按分鐘計費。 請務必在使用完叢集後將它刪除。 請參閱如何刪除 HDInsight 叢集

本文中的步驟會在不同的 Azure 資源群組中建立叢集。 若要刪除所有建立的資源,您可以刪除所建立的兩個資源群組:kafka-primary-rgkafka-secondary-rg。 刪除資源群組會移除遵循本文所建立的所有資源,包括叢集、虛擬網路和儲存體帳戶。

下一步

在本文中,您已了解如何使用 MirrorMaker2 來建立 Apache Kafka 叢集的複本。

使用下列連結來探索使用 Kafka 的其他方式︰