使用 MirrorMaker,透過 HDInsight 上的 Kafka 來複寫 Apache Kafka 主題
了解如何使用 Apache Kafka 的鏡像功能,將主題複寫至次要叢集。 您可以執行鏡像作為連續程序,或間歇性執行,以將資料從一個叢集移轉至另一個叢集。
在本文中,您將使用鏡像來複寫兩個 HDInsight 叢集之間的主題。 這些叢集位於不同資料中心的不同虛擬網路中。
警告
請勿使用鏡像作為達成容錯的方法。 主題內項目的位移在主要叢集與次要叢集之間有所不同,所以用戶端無法交替使用這兩者。 如果您很擔心容錯,您應該為叢集內的主題設定複寫。 如需詳細資訊,請參閱 開始使用 HDInsight 上的 Apache Kafka。
Apache Kafka 鏡像的運作方式
鏡像的運作方式是使用 MirrorMaker 工具,這是 Apache Kafka 的一部分。 MirrorMaker 會從主要叢集上的主題取用記錄,然後在次要叢集上建立本機複本。 MirrorMaker 會使用一個或多個從主要叢集讀取的取用者,以及一個寫入至本機 (次要) 叢集的生產者。
災害復原的最有用鏡像設定會利用不同 Azure 區域中的 Kafka 叢集。 為了達成此目的,叢集所在的虛擬網路會對等互連。
下圖說明鏡像程序以及通訊在叢集之間的流動方式:
主要與次要叢集的節點與磁碟分割數目可能有所不同,且主題內的位移也會不同。 鏡像會維護用於資料分割的機碼值,因此會根據每個機碼保留記錄順序。
跨網路界限鏡像
如果您需要在不同網路中的 Kafka 叢集之間進行鏡像,請注意下列其他考量:
閘道:網路必須能夠在 TCP/IP 層級進行通訊。
伺服器定址:您可以選擇使用 IP 位址或完整網域名稱來定址叢集節點。
IP 位址:如果您將 Kafka 叢集設定為使用 IP 位址公告,則可以使用訊息代理程式節點和 Zookeeper 節點的 IP 位址,繼續進行鏡像設定。
網域名稱:如果您未設定 Kafka 叢集進行 IP 位址公告,叢集必須能夠使用完整網域名稱 (FQDN) 彼此連接。 這需要每個網路中的網域名稱系統 (DNS) 伺服器設定為將要求轉送到其他網路。 建立 Azure 虛擬網路時,您必須指定自訂 DNS 伺服器和伺服器的 IP 位址,而不是使用隨著網路提供的自動 DNS。 建立虛擬網路之後,您必須接著建立使用該 IP 位址的 Azure 虛擬機器。 然後您會在其上安裝和設定 DNS 軟體。
重要
先建立和設定自訂 DNS 伺服器,然後再將 HDInsight 安裝到虛擬網路中。 HDInsight 不需要進行其他設定,即可使用針對虛擬網路設定的 DNS 伺服器。
如需有關如何連接兩個 Azure 虛擬網路的詳細資訊,請參閱設定連線。
鏡像結構
此結構會在不同的資源群組和虛擬網路中提供兩個叢集:主要和次要。
建立步驟
建立兩個新的資源群組:
資源群組 Location kafka-primary-rg 美國中部 kafka-secondary-rg 美國中北部 在 kafka-primary-rg 中建立新的虛擬網路 kafka-primary-vnet。 保留預設設定。
在 kafka-secondary-rg 中建立新的虛擬網路 kafka-secondary-vnet,也具有預設設定。
建立兩個新的 Kafka 叢集:
叢集名稱 資源群組 虛擬網路 儲存體帳戶 kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage 建立虛擬網路對等互連。 此步驟將建立兩個對等互連:一個從 kafka-primary-vnet 到 kafka-secondary-vnet,另一個從 kafka-secondary-vnet 回到 kafka-primary-vnet。
選取 kafka-primary-vnet 虛擬網路。
在 [設定] 底下,選取 [對等互連]。
選取 [新增]。
在 [新增對等互連] 畫面上輸入詳細資料,如下列螢幕擷取畫面所示。
設定 IP 通告
設定 IP 通告,讓用戶端能夠使用訊息代理程式 IP 位址 (而不是網域名稱) 連線。
移至主要叢集的 Ambari 儀表板:
https://PRIMARYCLUSTERNAME.azurehdinsight.net
。選取 [服務] > [Kafka]。 選取 [ 設定 ] 索引標籤。
將下列組態行新增至底部的 kafka-env 範本區段。 選取 [儲存]。
# Configure Kafka to advertise IP addresses instead of FQDN IP_ADDRESS=$(hostname -i) echo advertised.listeners=$IP_ADDRESS sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
在 [儲存組態] 畫面上輸入備註,然後選取 [儲存]。
如果您收到組態警告,請選取 [仍要繼續]。
在 [儲存組態變更] 上,選取 [確定]。
在 [需要重新啟動] 通知中,選取 [重新啟動] > [重新啟動所有受影響項目]。 然後選取 [確認重新啟動所有項目]。
設定 Kafka 以在所有網路介面上接聽
- 停留在 [服務] > [Kafka] 下的 [設定] 索引標籤上。 在 [Kafka Broker] 區段中,將 [接聽程式] 屬性設定為
PLAINTEXT://0.0.0.0:9092
。 - 選取 [儲存]。
- 選取 [重新啟動] > [確認全部重新啟動]。
記錄主要叢集的訊息代理程式 IP 位址和 ZooKeeper 位址
選取 Ambari 儀表板上的 [主機]。
記下訊息代理程式和 ZooKeepers 的 IP 位址。 訊息代理程式節點主機名稱的前兩個字母為 wn,而 ZooKeeper 節點主機名稱的前兩個字母則為 zk。
針對第二個叢集 kafka-secondary-cluster 重複上述三個步驟:設定 IP 通告、設定接聽程式,並記下訊息代理程式和 ZooKeeper IP 位址。
建立主題
使用 SSH 連線至主要叢集:
ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
將
sshuser
取代為建立叢集時所使用的 SSH 使用者名稱。 將PRIMARYCLUSTER
取代為建立叢集時所使用的基礎名稱。如需詳細資訊,請參閱搭配 HDInsight 使用 SSH。
使用下列命令來建立兩個環境變數,其中的主要叢集包含 Apache ZooKeeper 主機和訊息代理程式主機。 將
ZOOKEEPER_IP_ADDRESS1
之類的字串取代為先前記錄的實際 IP 位址,例如10.23.0.11
和10.23.0.7
。BROKER_IP_ADDRESS1
也是如此。 如果您使用 FQDN 解析搭配自訂 DNS 伺服器,請遵循這些步驟來取得訊息代理程式和 ZooKeeper 名稱。# get the ZooKeeper hosts for the primary cluster export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181' # get the broker hosts for the primary cluster export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
若要建立名為
testtopic
的主題,請使用下列命令:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
使用下列命令確認已建立主題:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
回應包含
testtopic
。使用下列命令來檢視這個 (主要) 叢集的訊息代理程式主機資訊:
echo $PRIMARY_BROKERHOSTS
此命令會傳回類似以下文字的資訊:
10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092
請儲存此資訊。 此資訊用於下一節。
設定鏡像功能
使用不同的 SSH 工作階段連線至次要叢集:
ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
將
sshuser
取代為建立叢集時所使用的 SSH 使用者名稱。 將SECONDARYCLUSTER
取代為建立叢集時所使用的名稱。如需詳細資訊,請參閱搭配 HDInsight 使用 SSH。
使用
consumer.properties
檔案來設定與主要叢集的通訊。 若要建立檔案,請使用下列命令:nano consumer.properties
使用下列文字做為
consumer.properties
檔案的內容:bootstrap.servers=PRIMARY_BROKERHOSTS group.id=mirrorgroup
將
PRIMARY_BROKERHOSTS
取代為來自主要叢集的訊息代理程式主機 IP 位址。此檔案描述從主要 Kafka 叢集讀取資料時所要使用的取用者資訊。 如需詳細資訊,請參閱位於
kafka.apache.org
的取用者組態。若要儲存檔案,請按 Ctrl+X、按 Y,然後按 Enter。
設定與次要叢集通訊的產生者之前,請先為次要叢集的訊息代理程式 IP 位址設定變數。 使用下列命令來建立此變數:
export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
命令
echo $SECONDARY_BROKERHOSTS
應該會傳回類似下列文字的資訊:10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092
使用
producer.properties
檔案來與次要叢集通訊。 若要建立檔案,請使用下列命令:nano producer.properties
使用下列文字做為
producer.properties
檔案的內容:bootstrap.servers=SECONDARY_BROKERHOSTS compression.type=none
將
SECONDARY_BROKERHOSTS
取代為上一個步驟中使用的訊息代理程式 IP 位址。如需詳細資訊,請參閱位於
kafka.apache.org
的生產者組態。使用下列命令來建立環境變數,其中包含次要叢集 ZooKeeper 主機的 IP 位址:
# get the ZooKeeper hosts for the secondary cluster export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
HDInsight 上 Kafka 的預設組態不允許自動建立主題。 您必須先使用下列其中一個選項,才能啟動鏡像程序:
在次要叢集上建立主題:此選項也可讓您設定分割數目和複寫因子。
您可以使用下列命令提前建立主題:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
將
testtopic
取代為要建立的主題名稱。設定可供自動建立主題的叢集:此選項可讓 MirrorMaker 自動建立主題。 請注意,可能會以與主要主題不同的分割區數目或不同的複寫因數來建立它們。
若要設定次要叢集來自動建立主題,請執行下列步驟:
- 移至次要叢集的 Ambari 儀表板:
https://SECONDARYCLUSTERNAME.azurehdinsight.net
。 - 選取 [服務] > [Kafka]。 然後選取 [設定] 索引標籤。
- 在 [篩選] 欄位中,輸入
auto.create
的值。 這會篩選屬性清單並顯示auto.create.topics.enable
設定。 - 將
auto.create.topics.enable
的值變更為true
,然後選取 [儲存]。 新增附註,然後再次選取 [儲存]。 - 依序選取 [Kafka] 服務、[重新啟動] 和 [重新啟動所有受影響的]。 出現提示時,選取 [確認全部重新啟動]。
- 移至次要叢集的 Ambari 儀表板:
啟動 MirrorMaker
注意
本文會參考 Microsoft 不再使用的術語。 從軟體中移除該字詞時,我們也會將其從本文中移除。
在連往次要叢集的 SSH 連線中,使用下列命令來啟動 MirrorMaker 程序:
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
此範例中使用的參數:
參數 描述 --consumer.config
指定包含取用者屬性的檔案。 您可以使用這些屬性來建立會從主要 Kafka 叢集讀取的取用者。 --producer.config
指定包含產生者屬性的檔案。 您可以使用這些屬性來建立會寫入次要 Kafka 叢集的產生者。 --whitelist
MirrorMaker 從主要叢集複寫至次要叢集的主題清單。 --num.streams
要建立的取用者執行緒數目。 次要節點上的取用者現在正在等候接收訊息。
從連往主要叢集的 SSH 連線中,使用下列命令來啟動產生者,然後傳送訊息到主題:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
當您抵達有游標的空白行時,請輸入一些文字訊息。 訊息會傳送到主要叢集上的主題。 完成後,使用 Ctrl + C 來結束產生者程序。
從連往次要叢集的 SSH 連線,按 Ctrl+C 來結束 MirrorMaker 程序。 這可能需要數秒鐘的時間才能完成程序。 若要驗證訊息已複寫到次要叢集,請使用下列命令:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
主題清單現在包含
testtopic
,它是在 MirrorMaster 將主題從主要叢集鏡像至次要叢集時所建立。 從主題中擷取的訊息與在主要叢集上所輸入的內容相同。
選取叢集
警告
不論使用與否,HDInsight 叢集都是按分鐘計費。 請務必在使用完叢集後將它刪除。 請參閱如何刪除 HDInsight 叢集。
本文中的步驟會在不同的 Azure 資源群組中建立叢集。 若要刪除所有建立的資源,您可以刪除建立的兩個資源群組:kafka-primary-rg 和 kafka-secondary-rg。 刪除資源群組會移除遵循本文所建立的所有資源,包括叢集、虛擬網路和儲存體帳戶。
下一步
在本文中,您已學會如何使用 MirrorMaker 來建立 Apache Kafka 叢集的複本。 使用下列連結來探索使用 Kafka 的其他方式︰