Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
MirrorMaker 2.0 (MM2) wurde zur Vereinfachung der Spiegelung oder Replikation von Themen zwischen Kafka-Clustern konzipiert. Das Feature verwendet das Kafka Connect-Framework, um die Konfiguration und Skalierung zu vereinfachen. Es erkennt dynamisch Änderungen an Themen und stellt sicher, dass Eigenschaften von Quell- und Zielthemen synchronisiert werden, einschließlich Offsets und Partitionen.
In diesem Artikel erfahren Sie, wie Sie Kafka MirrorMaker 2.0 bei der Datenmigration/-replikation sowie in einigen Anwendungsfällen verwenden.
Voraussetzungen
- Umgebung mit mindestens zwei HDI-Kafka-Clustern.
- Kafka-Version höher als 2.4 (HDI 4.0).
- Der Quellcluster sollte Datenpunkte und Themen enthalten, damit Sie verschiedene Features des MirrorMaker 2.0-Replikationsprozesses testen können.
Anwendungsfall
Simulation von MirrorMaker 2.0 zum Replizieren von Datenpunkten/Offsets zwischen zwei Kafka-Clustern in HDInsight. Dieser Fall kann für Szenarien wie einer erforderlichen Datenreplikation zwischen zwei oder mehr Kafka-Clustern beispielsweise für Notfallwiederherstellung, Cloudanpassung, Georeplikation, Datenisolation und Datenaggregation verwendet werden.
Offsetreplikation mit MirrorMaker 2.0
Interne Features von MM2
Das MirrorMaker 2.0-Tool besteht aus verschiedenen Connectors. Diese Connectors sind Kafka Connect-Standardconnectors, die direkt mit Kafka Connect im eigenständigen oder verteilten Modus verwendet werden können.
Die Zusammenfassung des Brokereinrichtungsvorgangs lautet wie folgt:
MirrorSourceConnector:
- Repliziert Remotethemen, Thema-ACLs und Konfigurationen eines einzelnen Quellclusters.
- Gibt Offsetsynchronisierungen an ein internes Thema aus.
MirrorSinkConnector:
- Repliziert Themen aus dem primären Cluster in einen einzelnen Zielcluster.
MirrorCheckpointConnector:
- Erfasst Offsetsynchronisierungen.
- Gibt Prüfpunkte aus, um Failoverpunkte zu aktivieren.
MirrorHeartBeatConnector:
- Sendet Heartbeats an Remotecluster und ermöglicht so die Überwachung des Replikationsprozesses.
Bereitstellung
Das Skript „Connect-mirror-maker.sh“, das im Paket mit der Kafka-Bibliothek enthalten ist, implementiert einen verteilten MM2-Cluster, der die Connect-Worker intern basierend auf einer Konfigurationsdatei verwaltet. Intern erstellt und verarbeitet der MirrorMaker-Treiber Paare aus den jeweiligen Connectors: MirrorSourceConnector, MirrorSinkConnector, MirrorCheckpointConnector und MirrorHeartbeatConnector.
MirrorMaker 2.0 wird gestartet.
./bin/connect-mirror-maker.sh ./config/mirror-maker.properties
Hinweis
Bei Kerberos-fähigen Clustern muss die JAAS-Konfiguration in die KAFKA_OPTS exportiert oder in der MM2-Konfigurationsdatei angegeben werden.
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"
MirrorMaker 2.0-Beispielkonfigurationsdatei
# specify any number of cluster aliases
clusters = source, destination
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
# enable and configure individual replication flows
source->destination.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"
# Setting replication factor of newly created remote topics
replication.factor=3
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
SSL-Konfiguration
Wenn das Setup eine SSL-Konfiguration erfordert:
destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password>
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI
Globale Konfigurationen
Eigenschaft | Standardwert | Beschreibung |
---|---|---|
name | Erforderlich | Name des Connectors, z. B. „us-west->us-east“. |
topics | Leere Zeichenfolge | Regulärer Ausdruck der zu replizierenden Themen, z. B. „topic1, topic2, topic3“. Durch Trennzeichen getrennte Listen werden auch unterstützt. |
topics.blacklist | „..internal, ..replica, __consumer_offsets“ oder Ähnliches | Themen, die aus der Replikation ausgeschlossen werden sollen. |
groups | Leere Zeichenfolge | Regulärer Ausdruck der zu replizierenden Gruppen, z. B. „.*“ |
groups.blacklist | Leere Zeichenfolge | Gruppen, die aus der Replikation ausgeschlossen werden sollen. |
source.cluster.alias | Erforderlich | Der Name des Clusters, der repliziert wird. |
target.cluster.alias | Erforderlich | Der Name des Downstream-Kafka-Clusters. |
source.cluster.bootstrap.servers | Erforderlich | Der Upstreamcluster, der repliziert werden soll. |
target.cluster.bootstrap.servers | Erforderlich | Der Downstreamcluster. |
sync.topic.configs.enabled | true | Gibt an, ob der Quellcluster auf Konfigurationsänderungen überwacht werden soll. |
sync.topic.acls.enabled | true | Gibt an, ob Quellcluster-ACLs auf Änderungen überwacht werden sollen. |
emit.heartbeats.enabled | true | Gibt an, dass der Connector regelmäßig Heartbeats senden soll. |
emit.heartbeats.interval.seconds | true | Die Frequenz der Heartbeats. |
emit.checkpoints.enabled | true | Der Connector sollte regelmäßig Informationen zu Consumeroffsets senden. |
emit.checkpoints.interval.seconds | 5 (Sekunden) | Die Häufigkeit von Prüfpunkten. |
refresh.topics.enabled | true | Der Connector sollte regelmäßig nach neuen Consumergruppen suchen. |
refresh.topics.interval.seconds | 5 (Sekunden) | Die Häufigkeit, mit der der Quellcluster auf neue Verbrauchergruppen überprüft werden soll. |
refresh.groups.enabled | true | Der Connector sollte regelmäßig nach neuen Consumergruppen suchen. |
refresh.groups.interval.seconds | 5 (Sekunden) | Die Häufigkeit, mit der der Quellcluster auf neue Verbrauchergruppen überprüft werden soll. |
readahead.queue.capacity | 500 (Datensätze) | Die Anzahl von Datensätzen, die der Consumer dem Producer voraus sein darf. |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | Zum Nachahmen des veralteten MirrorMaker soll LegacyReplicationPolicy verwendet werden. |
heartbeats.topic.retention.ms | ein Tag | Wird beim ersten Erstellen von Heartbeatthemen verwendet. |
checkpoints.topic.retention.ms | ein Tag | Wird beim ersten Erstellen von Prüfpunktthemen verwendet. |
offset.syncs.topic.retention.ms | max long | Wird beim erstmaligen Erstellen eines Offsetsynchronisierungsthemas verwendet. |
replication.factor | two | Wird beim Erstellen der Remotethemen verwendet. |
Häufig gestellte Fragen
Warum ist nach der Replikation eines Themas ein Unterschied beim letzten Offset im Quell- und Zielcluster festzustellen?
Es ist möglich, dass Datenpunkte des Quellthemas gelöscht wurden und daher die tatsächliche Anzahl von Datensätzen geringer ist als der Wert beim letzten Offset. Dies führt zu dem Unterschied beim letzten Offset zwischen Quell- und Zielcluster nach der Replikation, da die Replikation immer bei „offset-0“ des Zielclusters beginnt.
Wie verhalten sich die Consumer bei der Migration, wenn der Zielcluster eine andere Offsetzuordnung zu Datenpunkten aufweist?
Das MirrorMaker 2.0 MirrorCheckpointConnector-Feature speichert automatisch Offsetprüfpunkte für Consumergruppen im Quellcluster. Jeder Prüfpunkt enthält eine Zuordnung des letzten bestätigten Offsets für jede Gruppe im Quellcluster zum entsprechenden Offset im Zielcluster. Bei der Migration können die Consumer, die mit der Nutzung aus demselben Thema im Zielcluster beginnen, den Empfang von Nachrichten aus dem letzten Offset fortsetzen, den sie im Quellcluster bestätigt haben.
Wie lässt sich der genaue Themenname im Zielcluster beibehalten, da allen replizierten Themen der Quellalias vorangestellt ist?
Dies ist das Standardverhalten in MirrorMaker 2.0, um ein Überschreiben von Daten in komplexen Spiegelungstopologien zu verhindern. Wenn dieses Verhalten angepasst werden soll, muss im Hinblick auf das Replikationsflussdesign und die Themenverwaltung sehr umsichtig vorgegangen werden, um Datenverluste zu vermeiden. Dafür kann eine benutzerdefinierte Replikationsrichtlinienklasse für „replication.policy.class“ verwendet werden.
Warum werden neue interne Themen angezeigt, die in meiner Kafka-Quell- und -Zielplattform erstellt wurden?
Die Connectors erstellen interne MirrorMaker 2.0-Themen, um den Replikationsprozess, die Überwachung, die Offsetzuordnung und die Prüfpunkterstellung nachverfolgen zu können.
Warum erstellt MirrorMaker nur zwei Replikate des Themas im Zielcluster, obwohl die Quelle mehr enthält?
MirrorMaker 2 repliziert den Replikationsfaktor von Themen nicht in Zielcluster. Dies lässt sich über die MM2-Konfiguration steuern, indem die erforderliche Anzahl für „replication.factor“ angegeben wird. Der Standardwert ist 2.
Wie wird die benutzerdefinierte Replikationsrichtlinie in MirrorMaker 2.0 verwendet?
Eine benutzerdefinierte Replikationsrichtlinie kann durch Implementieren der folgenden Schnittstelle erstellt werden.
/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {
/** How to rename remote topics; generally should be like us-west.topic1. */
String formatRemoteTopic(String sourceClusterAlias, String topic);
/** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
Returns null if not a remote topic.
*/
String topicSource(String topic);
/** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
Topics may be replicated multiple hops, so the immediately upstream topic
may itself be a remote topic.
Returns null if not a remote topic.
*/
String upstreamTopic(String topic);
/** The name of the original source-topic, which may have been replicated multiple hops.
Returns the topic if it is not a remote topic.
*/
String originalTopic(String topic);
/** Internal topics are never replicated. */
boolean isInternalTopic(String topic);
}
Die Implementierung muss dem Kafka-Klassenpfad für den Klassenverweis hinzugefügt werden, der für „replication.policy.class“ in den MM2-Eigenschaften verwendet werden soll.
Nächste Schritte
Was ist Apache Kafka in HDInsight?
Referenzen
Apache-Dokumentation: Änderungen bei MirrorMaker 2.0
Einrichten von Clientzertifikaten für HDI Kafka