Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Program MirrorMaker 2.0 (MM2) został zaprojektowany tak, aby ułatwić dublowanie lub replikowanie tematów z jednego klastra platformy Kafka do innego. Używa platformy Kafka Connect, aby uprościć konfigurację i skalowanie. Dynamicznie wykrywa zmiany tematów i zapewnia synchronizację właściwości tematu źródłowego i docelowego, w tym przesunięcia i partycji.
W tym artykule dowiesz się, jak używać narzędzia Kafka MirrorMaker 2.0 w migracji/replikacji danych i przypadków użycia.
Wymagania wstępne
- Środowisko z co najmniej dwoma klastrami Kafka HDI.
- Wersja platformy Kafka wyższa niż 2.4 (HDI 4.0)
- Klaster źródłowy powinien zawierać punkty danych i tematy, aby przetestować różne funkcje procesu replikacji MirrorMaker 2.0
Przypadek użycia
Symulacja narzędzia MirrorMaker 2.0 w celu replikowania punktów danych/przesunięć między dwoma klastrami platformy Kafka w usłudze HDInsight. To samo może być używane w scenariuszach, takich jak wymagana replikacja danych między co najmniej dwoma klastrami platformy Kafka, takimi jak odzyskiwanie po awarii, adaptacja w chmurze, replikacja geograficzna, izolacja danych i agregacja danych.
Replikacja przesunięcia za pomocą narzędzia MirrorMaker 2.0
Elementy wewnętrzne MM2
Narzędzie MirrorMaker 2.0 składa się z różnych łączników. Te łączniki to standardowe łączniki platformy Kafka Connect, których można używać bezpośrednio z platformą Kafka Connect w trybie autonomicznym lub rozproszonym.
Podsumowanie procesu konfiguracji brokera jest następujące:
MirrorSourceConnector:
- Replikuje tematy zdalne, listy ACL i konfiguracje tematów pojedynczego klastra źródłowego.
- Emituje synchronizację przesunięcia z tematem wewnętrznym.
MirrorSinkConnector:
- Pobiera dane z klastra podstawowego i replikuje tematy do jednego klastra docelowego.
MirrorCheckpointConnector:
- Używa synchronizacji przesunięcia.
- Emituje punkty kontrolne w celu włączenia punktów trybu failover.
MirrorHeartBeatConnector:
- Emituje pulsy do klastrów zdalnych, umożliwiając monitorowanie procesu replikacji.
Wdrożenie
Connect-mirror-maker.sh, skrypt dołączony do biblioteki Kafka, implementuje rozproszony klaster MM2, który zarządza pracownikami Connect wewnętrznie na podstawie pliku konfiguracji. Sterownik Internally MirrorMaker tworzy i obsługuje pary poszczególnych łączników — MirrorSourceConnector, MirrorSinkConnector, MirrorCheckpoint connector i MirrorHeartbeatConnector.
Uruchom narzędzie MirrorMaker 2.0.
./bin/connect-mirror-maker.sh ./config/mirror-maker.properties
Uwaga
W przypadku klastrów z włączoną obsługą protokołu Kerberos konfiguracja JAAS musi zostać wyeksportowana do KAFKA_OPTS lub musi być określona w pliku konfiguracji MM2.
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"
Przykładowy plik konfiguracji MirrorMaker 2.0
# specify any number of cluster aliases
clusters = source, destination
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
# enable and configure individual replication flows
source->destination.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"
# Setting replication factor of newly created remote topics
replication.factor=3
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
Konfiguracja protokołu SSL
Jeśli konfiguracja wymaga konfiguracji protokołu SSL
destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password>
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI
Konfiguracje globalne
Nieruchomość | Wartość domyślna | opis |
---|---|---|
nazwa | wymagane | nazwa łącznika, na przykład "us-west-us-east>" |
Tematy | pusty ciąg | regex tematów do replikowania, na przykład "topic1, topic2, topic3". Obsługiwane są również listy rozdzielane przecinkami. |
tematy.czarna_lista | ".. wewnętrzne, .. replika, __consumer_offsets" lub podobna | tematy do wykluczenia z replikacji |
grupy | pusty ciąg | regex grup do replikacji, na przykład ".*" |
grupy.czarna_lista | pusty ciąg | grupy do wykluczenia z replikacji |
source.cluster.alias | wymagane | nazwa replikowanego klastra |
target.cluster.alias | wymagane | nazwa podrzędnego klastra platformy Kafka |
source.cluster.bootstrap.servers | wymagane | nadrzędny klaster do replikacji |
target.cluster.bootstrap.servers | wymagane | klaster podrzędny |
opcja.sync.topic.configs.włączone | prawda | czy monitorować klaster źródłowy pod kątem zmian konfiguracji |
sync.topic.acls.enabled | prawda | czy monitorować listy ACL klastra źródłowego pod kątem zmian |
emitowanie.pulsów.włączone | prawda | łącznik powinien okresowo emitować pulsy |
emit.heartbeats.interval.seconds | prawda | częstotliwość pulsów |
emit.punkty kontrolne.włączone | prawda | łącznik powinien okresowo przekazywać informacje o offsetach konsumentów |
emit.przedziały.czasowe.punktów.kontrolnych.sekundy | 5 (sekundy) | częstotliwość punktów kontrolnych |
odświeżanie.tematy.włączone | prawda | łącznik powinien okresowo sprawdzać dostępność nowych grup odbiorców |
odśwież.tematy.interwał.sekundy | 5 (sekundy) | częstotliwość sprawdzania klastra źródłowego dla nowych grup odbiorców |
odśwież.grupy.włączony | prawda | łącznik powinien okresowo sprawdzać dostępność nowych grup odbiorców |
odśwież.interwał.grup.sekundy | 5 (sekundy) | częstotliwość sprawdzania klastra źródłowego dla nowych grup odbiorców |
readahead.queue.capacity | 500 (rekordy) | liczba rekordów, aby umożliwić konsumentowi wyprzedzanie producenta |
klasa.polityki.replikacji | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | użyj LegacyReplicationPolicy, aby naśladować starszego MirrorMaker |
heartbeats.topic.retention.ms | jeden dzień | używany podczas tworzenia tematów pulsu po raz pierwszy |
checkpoints.topic.retention.ms | jeden dzień | używany podczas tworzenia tematów punktów kontrolnych po raz pierwszy |
"offset.syncs.topic.retention.ms" | maksymalna długość | używany podczas tworzenia tematu synchronizacji przesunięcia po raz pierwszy |
czynnik replikacji | dwa | używane podczas tworzenia tematów zdalnych |
Często zadawane pytania
Dlaczego widzimy różnicę w ostatnim przesunięciu na klastrze źródłowym i docelowym po replikacji tematu?
Możliwe, że punkty danych tematu źródłowego mogły zostać usunięte, przez co rzeczywista liczba rekordów będzie mniejsza niż ostatnia wartość przesunięcia. Powoduje to różnicę między ostatnim offsetem w klastrze źródłowym a offsetem w klastrze docelowym po replikacji, ponieważ replikacja zawsze rozpoczyna się od offsetu 0 klastra docelowego.
Jak będą się zachowywać konsumenci podczas migracji, jeśli klaster docelowy może mieć inne odwzorowanie przesunięcia dla punktów danych?
Funkcja MirrorMaker 2.0 MirrorCheckpointConnector automatycznie przechowuje punkty kontrolne przesunięcia grupy odbiorców dla grup odbiorców w klastrze źródłowym. Każdy punkt kontrolny zawiera mapowanie ostatniego zatwierdzonego przesunięcia dla każdej grupy w klastrze źródłowym na równoważne przesunięcie w klastrze docelowym. Tak więc w przypadku migracji konsumenci, którzy zaczynają konsumować ten sam temat w klastrze docelowym, będą mogli wznowić odbieranie komunikatów od ostatniego przesunięcia, które zatwierdzili w klastrze źródłowym.
Jak możemy zachować dokładną nazwę tematu w klastrze docelowym, ponieważ alias źródłowy jest poprzedzony wszystkimi replikowanymi tematami?
Jest to domyślne zachowanie w programie MirrorMaker 2.0, aby uniknąć zastępowania danych w złożonych topologiach dublowania. Aby uniknąć utraty danych, należy dokładnie dostosować go do projektu przepływu replikacji i zarządzania tematami. Można to zrobić przy użyciu niestandardowej klasy zasad replikacji dla "replication.policy.class".
Dlaczego widzimy nowe wewnętrzne tematy tworzone w moim źródle i docelowym Kafka?
Tematy wewnętrzne MirrorMaker 2.0 są tworzone przez konektory, aby śledzić proces replikacji, monitorowanie, mapowanie przesunięć i tworzenie punktów kontrolnych.
Dlaczego program MirrorMaker tworzy tylko dwie repliki tematu w klastrze docelowym, gdy źródło ma więcej?
Program MirrorMaker 2 nie replikuje współczynnika replikacji tematów do klastrów docelowych. Można to kontrolować z poziomu konfiguracji MM2, określając wymaganą liczbę "replication.factor". Wartość domyślna dla tej samej wartości to dwa.
Jak używać niestandardowych zasad replikacji w programie MirrorMaker 2.0?
Zasady replikacji niestandardowej można utworzyć, implementując poniższy interfejs.
/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {
/** How to rename remote topics; generally should be like us-west.topic1. */
String formatRemoteTopic(String sourceClusterAlias, String topic);
/** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
Returns null if not a remote topic.
*/
String topicSource(String topic);
/** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
Topics may be replicated multiple hops, so the immediately upstream topic
may itself be a remote topic.
Returns null if not a remote topic.
*/
String upstreamTopic(String topic);
/** The name of the original source-topic, which may have been replicated multiple hops.
Returns the topic if it is not a remote topic.
*/
String originalTopic(String topic);
/** Internal topics are never replicated. */
boolean isInternalTopic(String topic);
}
Implementację należy dodać do ścieżki klasy platformy Kafka, aby odwołanie do klasy było używane względem właściwości replication.policy.class we właściwościach MM2.
Następne kroki
Co to jest platforma Apache Kafka w usłudze HDInsight?
Odnośniki
MirrorMaker 2.0 zmienia dokument apache
Konfiguracja certyfikatów klienta dla platformy Kafka w usłudze HDI