Replikowanie tematów platformy Apache Kafka przy użyciu platformy Kafka w usłudze HDInsight za pomocą narzędzia MirrorMaker

Dowiedz się, jak używać funkcji dublowania platformy Apache Kafka do replikowania tematów do klastra pomocniczego. Dublowanie można uruchamiać jako proces ciągły lub sporadycznie migrować dane z jednego klastra do innego.

W tym artykule użyjesz dublowania, aby replikować tematy między dwoma klastrami usługi HDInsight. Te klastry znajdują się w różnych sieciach wirtualnych w różnych centrach danych.

Ostrzeżenie

Nie używaj dublowania jako środka w celu osiągnięcia odporności na uszkodzenia. Przesunięcie elementów w temacie różni się między klastrami podstawowymi i pomocniczymi, więc klienci nie mogą używać tych dwóch zamiennie. Jeśli interesuje Cię odporność na uszkodzenia, należy ustawić replikację tematów w klastrze. Aby uzyskać więcej informacji, zobacz Wprowadzenie do platformy Apache Kafka w usłudze HDInsight.

Jak działa dublowanie platformy Apache Kafka

Dublowanie działa przy użyciu narzędzia MirrorMaker , które jest częścią platformy Apache Kafka. Narzędzie MirrorMaker używa rekordów z tematów w klastrze podstawowym, a następnie tworzy lokalną kopię w klastrze pomocniczym. Narzędzie MirrorMaker używa co najmniej jednego użytkownika odczytującego z klastra podstawowego i producenta , który zapisuje w klastrze lokalnym (pomocniczym).

Najbardziej przydatna konfiguracja dublowania na potrzeby odzyskiwania po awarii korzysta z klastrów platformy Kafka w różnych regionach świadczenia usługi Azure. Aby to osiągnąć, sieci wirtualne, w których znajdują się klastry, są połączone za pomocą komunikacji równorzędnej.

Na poniższym diagramie przedstawiono proces dublowania i sposób przepływu komunikacji między klastrami:

Diagram procesu dublowania.

Klastry podstawowe i pomocnicze mogą być różne w liczbie węzłów i partycji, a przesunięcia w tych tematach również różnią się. Dublowanie utrzymuje wartość klucza używaną do partycjonowania, więc kolejność rekordów jest zachowywana na podstawie klucza.

Dublowanie między granicami sieci

Jeśli musisz dublować między klastrami platformy Kafka w różnych sieciach, należy wziąć pod uwagę następujące dodatkowe zagadnienia:

  • Bramy: sieci muszą być w stanie komunikować się na poziomie TCP/IP.

  • Adresowanie serwera: możesz wybrać adresowanie węzłów klastra przy użyciu ich adresów IP lub w pełni kwalifikowanych nazw domen.

    • Adresy IP: jeśli skonfigurujesz klastry platformy Kafka do korzystania z reklam adresów IP, możesz kontynuować konfigurację dublowania przy użyciu adresów IP węzłów brokera i węzłów usługi ZooKeeper.

    • Nazwy domen: jeśli nie skonfigurujesz klastrów platformy Kafka na potrzeby reklamy adresów IP, klastry muszą mieć możliwość łączenia się ze sobą przy użyciu w pełni kwalifikowanych nazw domen (FQDN). Wymaga to serwera systemu nazw domen (DNS) w każdej sieci skonfigurowanej do przekazywania żądań do innych sieci. Podczas tworzenia sieci wirtualnej platformy Azure zamiast używania automatycznego systemu DNS dostarczonego z siecią należy określić niestandardowy serwer DNS i adres IP serwera. Po utworzeniu sieci wirtualnej należy utworzyć maszynę wirtualną platformy Azure korzystającą z tego adresu IP. Następnie zainstalujesz i skonfigurujesz na nim oprogramowanie DNS.

    Ważne

    Utwórz i skonfiguruj niestandardowy serwer DNS przed zainstalowaniem usługi HDInsight w sieci wirtualnej. Nie ma dodatkowej konfiguracji wymaganej dla usługi HDInsight do korzystania z serwera DNS skonfigurowanego dla sieci wirtualnej.

Aby uzyskać więcej informacji na temat łączenia dwóch sieci wirtualnych platformy Azure, zobacz Konfigurowanie połączenia.

Architektura dublowania

Ta architektura obejmuje dwa klastry w różnych grupach zasobów i sieciach wirtualnych: podstawowym i pomocniczym.

Kroki tworzenia

  1. Utwórz dwie nowe grupy zasobów:

    Grupa zasobów Lokalizacja
    kafka-primary-rg Central US
    kafka-secondary-rg Północno-środkowe stany USA
  2. Utwórz nową sieć wirtualną kafka-primary-vnet na platformie kafka-primary-rg. Pozostaw ustawienia domyślne.

  3. Utwórz nową sieć wirtualną kafka-secondary-vnet na platformie kafka-secondary-rg, również z ustawieniami domyślnymi.

  4. Utwórz dwa nowe klastry platformy Kafka:

    Nazwa klastra Grupa zasobów Sieć wirtualna Konto magazynu
    kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage
  5. Tworzenie komunikacji równorzędnej sieci wirtualnych. W tym kroku zostaną utworzone dwie komunikacje równorzędne: jedna z platformy kafka-primary-vnet do sieci wirtualnej kafka-secondary-vnet i jedna z powrotem z sieci kafka-secondary-vnet do sieci kafka-primary-vnet.

    1. Wybierz sieć wirtualną kafka-primary-vnet .

    2. W obszarze Ustawienia wybierz pozycję Komunikacja równorzędna.

    3. Wybierz pozycję Dodaj.

    4. Na ekranie Dodawanie komunikacji równorzędnej wprowadź szczegóły, jak pokazano na poniższym zrzucie ekranu.

      Zrzut ekranu przedstawiający dodawanie komunikacji równorzędnej sieci wirtualnych przez platformę Kafka usługi H D Insight.

Konfigurowanie reklam IP

Skonfiguruj reklamy IP, aby umożliwić klientowi nawiązywanie połączenia przy użyciu adresów IP brokera zamiast nazw domen.

  1. Przejdź do pulpitu nawigacyjnego systemu Ambari dla klastra podstawowego: https://PRIMARYCLUSTERNAME.azurehdinsight.net.

  2. Wybierz pozycję Usługi>Kafka. Wybierz kartę Configs (Konfiguracje ).

  3. Dodaj następujące wiersze konfiguracji do dolnej sekcji szablonu kafka-env . Wybierz pozycję Zapisz.

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. Wprowadź notatkę na ekranie Zapisz konfigurację , a następnie wybierz pozycję Zapisz.

  5. Jeśli zostanie wyświetlone ostrzeżenie o konfiguracji, wybierz pozycję Kontynuuj mimo to.

  6. W obszarze Zapisz zmiany konfiguracji wybierz przycisk OK.

  7. W powiadomieniu Wymagane ponowne uruchomienie wybierz pozycję Uruchom ponownie ponownie>wszystkie, których dotyczy problem. Następnie wybierz pozycję Potwierdź ponownie uruchom wszystko.

    Zrzut ekranu przedstawiający opcję Apache Ambari w celu ponownego uruchomienia wszystkich, których dotyczy problem.

Konfigurowanie platformy Kafka do nasłuchiwania we wszystkich interfejsach sieciowych

  1. Na karcie Configs (Konfiguracje ) w obszarze Services Kafka ( Usługi>Kafka). W sekcji Broker platformy Kafka ustaw właściwość odbiorników na PLAINTEXT://0.0.0.0:9092.
  2. Wybierz pozycję Zapisz.
  3. Wybierz pozycję Uruchom ponownie, potwierdź ponowne uruchomienie> wszystkich.

Rejestrowanie adresów IP brokera i adresów ZooKeeper dla klastra podstawowego

  1. Wybierz pozycję Hosty na pulpicie nawigacyjnym systemu Ambari.

  2. Zanotuj adresy IP brokerów i zooKeepers. Węzły brokera mają wn jako pierwsze dwie litery nazwy hosta, a węzły ZooKeeper mają zk jako pierwsze dwie litery nazwy hosta.

    Zrzut ekranu przedstawiający adresy p węzła widoku systemu Apache Ambari.

  3. Powtórz poprzednie trzy kroki dla drugiego klastra, kafka-secondary-cluster: konfigurowanie reklamy IP, ustawianie odbiorników i zanotuj adresy IP brokera i zooKeeper.

Tworzenie tematów

  1. Połącz się z klastrem podstawowym przy użyciu protokołu SSH:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    

    Zastąp ciąg sshuser nazwą użytkownika SSH użytą podczas tworzenia klastra. Zastąp PRIMARYCLUSTER ciąg nazwą podstawową użytą podczas tworzenia klastra.

    Aby uzyskać więcej informacji, zobacz Używanie protokołu SSH w usłudze HDInsight.

  2. Użyj następującego polecenia, aby utworzyć dwie zmienne środowiskowe z hostami usługi Apache ZooKeeper i hostami brokera dla klastra podstawowego. Zastąp ciągi takie jak ZOOKEEPER_IP_ADDRESS1 rzeczywistymi zarejestrowanymi wcześniej adresami IP, takimi jak 10.23.0.11 i 10.23.0.7. To samo dotyczy BROKER_IP_ADDRESS1. Jeśli używasz rozpoznawania nazw FQDN z niestandardowym serwerem DNS, wykonaj następujące kroki , aby uzyskać nazwy brokera i usługi ZooKeeper.

    # get the ZooKeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
    # get the broker hosts for the primary cluster
    export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    
  3. Aby utworzyć temat o nazwie testtopic, użyj następującego polecenia:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. Użyj następującego polecenia, aby sprawdzić, czy temat został utworzony:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    Odpowiedź zawiera .testtopic

  5. Użyj następujących informacji, aby wyświetlić informacje o hoście brokera dla tego klastra (podstawowego):

    echo $PRIMARY_BROKERHOSTS
    

    Zwraca to informacje podobne do następującego tekstu:

    10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092

    Zapisz te informacje. Jest on używany w następnej sekcji.

Konfigurowanie dublowania

  1. Połącz się z klastrem pomocniczym przy użyciu innej sesji SSH:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
    

    Zastąp ciąg sshuser nazwą użytkownika SSH użytą podczas tworzenia klastra. Zastąp SECONDARYCLUSTER ciąg nazwą użytą podczas tworzenia klastra.

    Aby uzyskać więcej informacji, zobacz Używanie protokołu SSH w usłudze HDInsight.

  2. Użyj pliku, aby skonfigurować komunikację consumer.properties z klastrem podstawowym. Aby utworzyć plik, użyj następującego polecenia:

    nano consumer.properties
    

    Użyj następującego tekstu jako zawartości consumer.properties pliku:

    bootstrap.servers=PRIMARY_BROKERHOSTS
    group.id=mirrorgroup
    

    Zastąp ciąg PRIMARY_BROKERHOSTS adresami IP hosta brokera z klastra podstawowego.

    W tym pliku opisano informacje o użytkowniku używane podczas odczytywania z podstawowego klastra platformy Kafka. Aby uzyskać więcej informacji, zobacz Konfiguracje konsumentów pod adresem kafka.apache.org.

    Aby zapisać plik, naciśnij klawisze Ctrl+X, naciśnij klawisz Y, a następnie naciśnij klawisz Enter.

  3. Przed skonfigurowaniem producenta komunikującego się z klastrem pomocniczym skonfiguruj zmienną dla adresów IP brokera klastra pomocniczego. Użyj następujących poleceń, aby utworzyć tę zmienną:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    Polecenie echo $SECONDARY_BROKERHOSTS powinno zwrócić informacje podobne do następującego tekstu:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. producer.properties Użyj pliku do komunikowania się z klastrem pomocniczym. Aby utworzyć plik, użyj następującego polecenia:

    nano producer.properties
    

    Użyj następującego tekstu jako zawartości producer.properties pliku:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    Zastąp ciąg SECONDARY_BROKERHOSTS adresami IP brokera używanymi w poprzednim kroku.

    Aby uzyskać więcej informacji, zobacz Konfiguracja producenta pod adresem kafka.apache.org.

  5. Użyj następujących poleceń, aby utworzyć zmienną środowiskową z adresami IP hostów zooKeeper dla klastra pomocniczego:

    # get the ZooKeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. Domyślna konfiguracja platformy Kafka w usłudze HDInsight nie zezwala na automatyczne tworzenie tematów. Przed rozpoczęciem procesu dublowania należy użyć jednej z następujących opcji:

    • Utwórz tematy w klastrze pomocniczym: ta opcja umożliwia również ustawienie liczby partycji i współczynnika replikacji.

      Tematy można tworzyć z wyprzedzeniem przy użyciu następującego polecenia:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      Zastąp testtopic ciąg nazwą tematu do utworzenia.

    • Skonfiguruj klaster na potrzeby automatycznego tworzenia tematu: ta opcja umożliwia narzędzie MirrorMaker automatyczne tworzenie tematów. Należy pamiętać, że może utworzyć je z inną liczbą partycji lub innym czynnikiem replikacji niż temat podstawowy.

      Aby skonfigurować klaster pomocniczy do automatycznego tworzenia tematów, wykonaj następujące kroki:

      1. Przejdź do pulpitu nawigacyjnego systemu Ambari dla klastra pomocniczego: https://SECONDARYCLUSTERNAME.azurehdinsight.net.
      2. Wybierz pozycję Usługi>Kafka. Następnie wybierz kartę Configs (Konfiguracje ).
      3. W polu Filtr wprowadź wartość .auto.create Spowoduje to filtrowanie listy właściwości i wyświetlanie auto.create.topics.enable ustawienia.
      4. Zmień wartość na auto.create.topics.enabletrue, a następnie wybierz pozycję Zapisz. Dodaj notatkę, a następnie ponownie wybierz pozycję Zapisz .
      5. Wybierz usługę Kafka , wybierz pozycję Uruchom ponownie, a następnie wybierz pozycję Uruchom ponownie wszystkie, których dotyczy problem. Po wyświetleniu monitu wybierz pozycję Potwierdź ponowne uruchomienie wszystkich.

      Zrzut ekranu przedstawiający sposób włączania automatycznego tworzenia tematów w usłudze kafka.

Uruchom narzędzie MirrorMaker

Uwaga

Ten artykuł zawiera odwołania do terminu, którego firma Microsoft już nie używa. Po usunięciu terminu z oprogramowania usuniemy go z tego artykułu.

  1. Z połączenia SSH z klastrem pomocniczym użyj następującego polecenia, aby uruchomić proces MirrorMaker:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    Parametry używane w tym przykładzie to:

    Parametr Opis
    --consumer.config Określa plik zawierający właściwości konsumenta. Te właściwości służą do tworzenia konsumenta, który odczytuje z podstawowego klastra platformy Kafka.
    --producer.config Określa plik zawierający właściwości producenta. Te właściwości służą do tworzenia producenta, który zapisuje w pomocniczym klastrze platformy Kafka.
    --whitelist Lista tematów replikowanych z klastra podstawowego do pomocniczego programu MirrorMaker.
    --num.streams Liczba wątków konsumenta do utworzenia.

    Użytkownik w węźle pomocniczym oczekuje teraz na odbieranie komunikatów.

  2. Z poziomu połączenia SSH z klastrem podstawowym użyj następującego polecenia, aby uruchomić producenta i wysłać komunikaty do tematu:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
    

    Po nadejściu pustego wiersza z kursorem wpisz kilka wiadomości tekstowych. Komunikaty są wysyłane do tematu w klastrze podstawowym. Po zakończeniu naciśnij klawisze Ctrl+C, aby zakończyć proces producenta.

  3. Z połączenia SSH z klastrem pomocniczym naciśnij klawisze Ctrl+C, aby zakończyć proces MirrorMaker. Zakończenie procesu może potrwać kilka sekund. Aby sprawdzić, czy komunikaty zostały zreplikowane do pomocniczej, użyj następującego polecenia:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
    

    Lista tematów zawiera testtopicteraz element , który jest tworzony podczas dublowania tematu z klastra podstawowego do pomocniczego. Komunikaty pobrane z tematu są takie same jak komunikaty wprowadzone w klastrze podstawowym.

Usuwanie klastra

Ostrzeżenie

Rozliczenia dla klastrów usługi HDInsight są naliczane proporcjonalnie na minutę, niezależnie od tego, czy są używane. Pamiętaj, aby usunąć klaster po zakończeniu korzystania z niego. Zobacz , jak usunąć klaster usługi HDInsight.

Kroki opisane w tym artykule zostały utworzone klastry w różnych grupach zasobów platformy Azure. Aby usunąć wszystkie utworzone zasoby, możesz usunąć dwie utworzone grupy zasobów: kafka-primary-rg i kafka-secondary-rg. Usunięcie grup zasobów powoduje usunięcie wszystkich zasobów utworzonych w tym artykule, w tym klastrów, sieci wirtualnych i kont magazynu.

Następne kroki

W tym artykule przedstawiono sposób tworzenia repliki klastra Apache Kafka przy użyciu narzędzia MirrorMaker. Skorzystaj z poniższych linków, aby odnaleźć inne sposoby pracy z platformą Kafka: