Udostępnij za pośrednictwem


Jak używać narzędzia Kafka MirrorMaker 2.0 w migracji danych, replikacji i przypadków użycia

Program MirrorMaker 2.0 (MM2) został zaprojektowany tak, aby ułatwić dublowanie lub replikowanie tematów z jednego klastra platformy Kafka do innego. Używa platformy Kafka Connect, aby uprościć konfigurację i skalowanie. Dynamicznie wykrywa zmiany tematów i zapewnia synchronizację właściwości tematu źródłowego i docelowego, w tym przesunięcia i partycji.

W tym artykule dowiesz się, jak używać narzędzia Kafka MirrorMaker 2.0 w migracji/replikacji danych i przypadków użycia.

Wymagania wstępne

  • Środowisko z co najmniej dwoma klastrami Kafka HDI.
  • Wersja platformy Kafka wyższa niż 2.4 (HDI 4.0)
  • Klaster źródłowy powinien zawierać punkty danych i tematy, aby przetestować różne funkcje procesu replikacji MirrorMaker 2.0

Przypadek użycia

Symulacja narzędzia MirrorMaker 2.0 w celu replikowania punktów danych/przesunięć między dwoma klastrami platformy Kafka w usłudze HDInsight. To samo może być używane w scenariuszach, takich jak wymagana replikacja danych między co najmniej dwoma klastrami platformy Kafka, takimi jak odzyskiwanie po awarii, adaptacja w chmurze, replikacja geograficzna, izolacja danych i agregacja danych.

Replikacja przesunięcia za pomocą narzędzia MirrorMaker 2.0

Elementy wewnętrzne MM2

Narzędzie MirrorMaker 2.0 składa się z różnych łączników. Te łączniki to standardowe łączniki platformy Kafka Connect, których można używać bezpośrednio z platformą Kafka Connect w trybie autonomicznym lub rozproszonym.

Podsumowanie procesu konfiguracji brokera jest następujące:

MirrorSourceConnector:

  1. Replikuje tematy zdalne, listy ACL i konfiguracje tematów pojedynczego klastra źródłowego.
  2. Emituje synchronizację przesunięcia z tematem wewnętrznym.

MirrorSinkConnector:

  1. Pobiera dane z klastra podstawowego i replikuje tematy do jednego klastra docelowego.

MirrorCheckpointConnector:

  1. Używa synchronizacji przesunięcia.
  2. Emituje punkty kontrolne w celu włączenia punktów trybu failover.

MirrorHeartBeatConnector:

  1. Emituje pulsy do klastrów zdalnych, umożliwiając monitorowanie procesu replikacji.

Wdrożenie

  1. Connect-mirror-maker.sh, skrypt dołączony do biblioteki Kafka, implementuje rozproszony klaster MM2, który zarządza pracownikami Connect wewnętrznie na podstawie pliku konfiguracji. Sterownik Internally MirrorMaker tworzy i obsługuje pary poszczególnych łączników — MirrorSourceConnector, MirrorSinkConnector, MirrorCheckpoint connector i MirrorHeartbeatConnector.

  2. Uruchom narzędzie MirrorMaker 2.0.

    ./bin/connect-mirror-maker.sh ./config/mirror-maker.properties    
    

Uwaga

W przypadku klastrów z włączoną obsługą protokołu Kerberos konfiguracja JAAS musi zostać wyeksportowana do KAFKA_OPTS lub musi być określona w pliku konfiguracji MM2.

export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"    

Przykładowy plik konfiguracji MirrorMaker 2.0

# specify any number of cluster aliases
clusters = source, destination

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092"  and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092

# enable and configure individual replication flows
source->destination.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"

# Setting replication factor of newly created remote topics
replication.factor=3

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1    

Konfiguracja protokołu SSL

Jeśli konfiguracja wymaga konfiguracji protokołu SSL

destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password> 
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI

Konfiguracje globalne

Nieruchomość Wartość domyślna opis
nazwa wymagane nazwa łącznika, na przykład "us-west-us-east>"
Tematy pusty ciąg regex tematów do replikowania, na przykład "topic1, topic2, topic3". Obsługiwane są również listy rozdzielane przecinkami.
tematy.czarna_lista ".. wewnętrzne, .. replika, __consumer_offsets" lub podobna tematy do wykluczenia z replikacji
grupy pusty ciąg regex grup do replikacji, na przykład ".*"
grupy.czarna_lista pusty ciąg grupy do wykluczenia z replikacji
source.cluster.alias wymagane nazwa replikowanego klastra
target.cluster.alias wymagane nazwa podrzędnego klastra platformy Kafka
source.cluster.bootstrap.servers wymagane nadrzędny klaster do replikacji
target.cluster.bootstrap.servers wymagane klaster podrzędny
opcja.sync.topic.configs.włączone prawda czy monitorować klaster źródłowy pod kątem zmian konfiguracji
sync.topic.acls.enabled prawda czy monitorować listy ACL klastra źródłowego pod kątem zmian
emitowanie.pulsów.włączone prawda łącznik powinien okresowo emitować pulsy
emit.heartbeats.interval.seconds prawda częstotliwość pulsów
emit.punkty kontrolne.włączone prawda łącznik powinien okresowo przekazywać informacje o offsetach konsumentów
emit.przedziały.czasowe.punktów.kontrolnych.sekundy 5 (sekundy) częstotliwość punktów kontrolnych
odświeżanie.tematy.włączone prawda łącznik powinien okresowo sprawdzać dostępność nowych grup odbiorców
odśwież.tematy.interwał.sekundy 5 (sekundy) częstotliwość sprawdzania klastra źródłowego dla nowych grup odbiorców
odśwież.grupy.włączony prawda łącznik powinien okresowo sprawdzać dostępność nowych grup odbiorców
odśwież.interwał.grup.sekundy 5 (sekundy) częstotliwość sprawdzania klastra źródłowego dla nowych grup odbiorców
readahead.queue.capacity 500 (rekordy) liczba rekordów, aby umożliwić konsumentowi wyprzedzanie producenta
klasa.polityki.replikacji org.apache.kafka.connect.mirror.DefaultReplicationPolicy użyj LegacyReplicationPolicy, aby naśladować starszego MirrorMaker
heartbeats.topic.retention.ms jeden dzień używany podczas tworzenia tematów pulsu po raz pierwszy
checkpoints.topic.retention.ms jeden dzień używany podczas tworzenia tematów punktów kontrolnych po raz pierwszy
"offset.syncs.topic.retention.ms" maksymalna długość używany podczas tworzenia tematu synchronizacji przesunięcia po raz pierwszy
czynnik replikacji dwa używane podczas tworzenia tematów zdalnych

Często zadawane pytania

Dlaczego widzimy różnicę w ostatnim przesunięciu na klastrze źródłowym i docelowym po replikacji tematu?

Możliwe, że punkty danych tematu źródłowego mogły zostać usunięte, przez co rzeczywista liczba rekordów będzie mniejsza niż ostatnia wartość przesunięcia. Powoduje to różnicę między ostatnim offsetem w klastrze źródłowym a offsetem w klastrze docelowym po replikacji, ponieważ replikacja zawsze rozpoczyna się od offsetu 0 klastra docelowego.

Jak będą się zachowywać konsumenci podczas migracji, jeśli klaster docelowy może mieć inne odwzorowanie przesunięcia dla punktów danych?

Funkcja MirrorMaker 2.0 MirrorCheckpointConnector automatycznie przechowuje punkty kontrolne przesunięcia grupy odbiorców dla grup odbiorców w klastrze źródłowym. Każdy punkt kontrolny zawiera mapowanie ostatniego zatwierdzonego przesunięcia dla każdej grupy w klastrze źródłowym na równoważne przesunięcie w klastrze docelowym. Tak więc w przypadku migracji konsumenci, którzy zaczynają konsumować ten sam temat w klastrze docelowym, będą mogli wznowić odbieranie komunikatów od ostatniego przesunięcia, które zatwierdzili w klastrze źródłowym.

Jak możemy zachować dokładną nazwę tematu w klastrze docelowym, ponieważ alias źródłowy jest poprzedzony wszystkimi replikowanymi tematami?

Jest to domyślne zachowanie w programie MirrorMaker 2.0, aby uniknąć zastępowania danych w złożonych topologiach dublowania. Aby uniknąć utraty danych, należy dokładnie dostosować go do projektu przepływu replikacji i zarządzania tematami. Można to zrobić przy użyciu niestandardowej klasy zasad replikacji dla "replication.policy.class".

Dlaczego widzimy nowe wewnętrzne tematy tworzone w moim źródle i docelowym Kafka?

Tematy wewnętrzne MirrorMaker 2.0 są tworzone przez konektory, aby śledzić proces replikacji, monitorowanie, mapowanie przesunięć i tworzenie punktów kontrolnych.

Dlaczego program MirrorMaker tworzy tylko dwie repliki tematu w klastrze docelowym, gdy źródło ma więcej?

Program MirrorMaker 2 nie replikuje współczynnika replikacji tematów do klastrów docelowych. Można to kontrolować z poziomu konfiguracji MM2, określając wymaganą liczbę "replication.factor". Wartość domyślna dla tej samej wartości to dwa.

Jak używać niestandardowych zasad replikacji w programie MirrorMaker 2.0?

Zasady replikacji niestandardowej można utworzyć, implementując poniższy interfejs.

/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {

    /** How to rename remote topics; generally should be like us-west.topic1. */
    String formatRemoteTopic(String sourceClusterAlias, String topic);
 
    /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
        Returns null if not a remote topic.
    */
    String topicSource(String topic);
 
    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
        Topics may be replicated multiple hops, so the immediately upstream topic
        may itself be a remote topic.
        Returns null if not a remote topic.
    */
    String upstreamTopic(String topic);
 
    /** The name of the original source-topic, which may have been replicated multiple hops.
        Returns the topic if it is not a remote topic.
    */
    String originalTopic(String topic);
 
    /** Internal topics are never replicated. */
    boolean isInternalTopic(String topic);
}

Implementację należy dodać do ścieżki klasy platformy Kafka, aby odwołanie do klasy było używane względem właściwości replication.policy.class we właściwościach MM2.

Następne kroki

Co to jest platforma Apache Kafka w usłudze HDInsight?

Odnośniki

MirrorMaker 2.0 zmienia dokument apache

Konfiguracja certyfikatów klienta dla platformy Kafka w usłudze HDI

HDInsight Kafka

Dokumentacja platformy Apache Kafka 2.4

Łączenie sieci lokalnej z platformą Azure