Freigeben über


So verwenden Sie Kafka MirrorMaker 2.0 bei der Datenmigration und -replikation – Anwendungsfälle.

MirrorMaker 2.0 (MM2) wurde zur Vereinfachung der Spiegelung oder Replikation von Themen zwischen Kafka-Clustern konzipiert. Das Feature verwendet das Kafka Connect-Framework, um die Konfiguration und Skalierung zu vereinfachen. Es erkennt dynamisch Änderungen an Themen und stellt sicher, dass Eigenschaften von Quell- und Zielthemen synchronisiert werden, einschließlich Offsets und Partitionen.

In diesem Artikel erfahren Sie, wie Sie Kafka MirrorMaker 2.0 bei der Datenmigration/-replikation sowie in einigen Anwendungsfällen verwenden.

Voraussetzungen

  • Umgebung mit mindestens zwei HDI-Kafka-Clustern.
  • Kafka-Version höher als 2.4 (HDI 4.0).
  • Der Quellcluster sollte Datenpunkte und Themen enthalten, damit Sie verschiedene Features des MirrorMaker 2.0-Replikationsprozesses testen können.

Anwendungsfall

Simulation von MirrorMaker 2.0 zum Replizieren von Datenpunkten/Offsets zwischen zwei Kafka-Clustern in HDInsight. Dieser Fall kann für Szenarien wie einer erforderlichen Datenreplikation zwischen zwei oder mehr Kafka-Clustern beispielsweise für Notfallwiederherstellung, Cloudanpassung, Georeplikation, Datenisolation und Datenaggregation verwendet werden.

Offsetreplikation mit MirrorMaker 2.0

Interne Features von MM2

Das MirrorMaker 2.0-Tool besteht aus verschiedenen Connectors. Diese Connectors sind Kafka Connect-Standardconnectors, die direkt mit Kafka Connect im eigenständigen oder verteilten Modus verwendet werden können.

Die Zusammenfassung des Brokereinrichtungsvorgangs lautet wie folgt:

MirrorSourceConnector:

  1. Repliziert Remotethemen, Thema-ACLs und Konfigurationen eines einzelnen Quellclusters.
  2. Gibt Offsetsynchronisierungen an ein internes Thema aus.

MirrorSinkConnector:

  1. Repliziert Themen aus dem primären Cluster in einen einzelnen Zielcluster.

MirrorCheckpointConnector:

  1. Erfasst Offsetsynchronisierungen.
  2. Gibt Prüfpunkte aus, um Failoverpunkte zu aktivieren.

MirrorHeartBeatConnector:

  1. Sendet Heartbeats an Remotecluster und ermöglicht so die Überwachung des Replikationsprozesses.

Bereitstellung

  1. Das Skript „Connect-mirror-maker.sh“, das im Paket mit der Kafka-Bibliothek enthalten ist, implementiert einen verteilten MM2-Cluster, der die Connect-Worker intern basierend auf einer Konfigurationsdatei verwaltet. Intern erstellt und verarbeitet der MirrorMaker-Treiber Paare aus den jeweiligen Connectors: MirrorSourceConnector, MirrorSinkConnector, MirrorCheckpointConnector und MirrorHeartbeatConnector.

  2. MirrorMaker 2.0 wird gestartet.

    ./bin/connect-mirror-maker.sh ./config/mirror-maker.properties    
    

Hinweis

Bei Kerberos-fähigen Clustern muss die JAAS-Konfiguration in die KAFKA_OPTS exportiert oder in der MM2-Konfigurationsdatei angegeben werden.

export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"    

MirrorMaker 2.0-Beispielkonfigurationsdatei

# specify any number of cluster aliases
clusters = source, destination

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092"  and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092

# enable and configure individual replication flows
source->destination.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"

# Setting replication factor of newly created remote topics
replication.factor=3

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1    

SSL-Konfiguration

Wenn das Setup eine SSL-Konfiguration erfordert:

destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password> 
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI

Globale Konfigurationen

Eigenschaft Standardwert Beschreibung
name Erforderlich Name des Connectors, z. B. „us-west->us-east“.
topics Leere Zeichenfolge Regulärer Ausdruck der zu replizierenden Themen, z. B. „topic1, topic2, topic3“. Durch Trennzeichen getrennte Listen werden auch unterstützt.
topics.blacklist „..internal, ..replica, __consumer_offsets“ oder Ähnliches Themen, die aus der Replikation ausgeschlossen werden sollen.
groups Leere Zeichenfolge Regulärer Ausdruck der zu replizierenden Gruppen, z. B. „.*“
groups.blacklist Leere Zeichenfolge Gruppen, die aus der Replikation ausgeschlossen werden sollen.
source.cluster.alias Erforderlich Der Name des Clusters, der repliziert wird.
target.cluster.alias Erforderlich Der Name des Downstream-Kafka-Clusters.
source.cluster.bootstrap.servers Erforderlich Der Upstreamcluster, der repliziert werden soll.
target.cluster.bootstrap.servers Erforderlich Der Downstreamcluster.
sync.topic.configs.enabled true Gibt an, ob der Quellcluster auf Konfigurationsänderungen überwacht werden soll.
sync.topic.acls.enabled true Gibt an, ob Quellcluster-ACLs auf Änderungen überwacht werden sollen.
emit.heartbeats.enabled true Gibt an, dass der Connector regelmäßig Heartbeats senden soll.
emit.heartbeats.interval.seconds true Die Frequenz der Heartbeats.
emit.checkpoints.enabled true Der Connector sollte regelmäßig Informationen zu Consumeroffsets senden.
emit.checkpoints.interval.seconds 5 (Sekunden) Die Häufigkeit von Prüfpunkten.
refresh.topics.enabled true Der Connector sollte regelmäßig nach neuen Consumergruppen suchen.
refresh.topics.interval.seconds 5 (Sekunden) Die Häufigkeit, mit der der Quellcluster auf neue Verbrauchergruppen überprüft werden soll.
refresh.groups.enabled true Der Connector sollte regelmäßig nach neuen Consumergruppen suchen.
refresh.groups.interval.seconds 5 (Sekunden) Die Häufigkeit, mit der der Quellcluster auf neue Verbrauchergruppen überprüft werden soll.
readahead.queue.capacity 500 (Datensätze) Die Anzahl von Datensätzen, die der Consumer dem Producer voraus sein darf.
replication.policy.class org.apache.kafka.connect.mirror.DefaultReplicationPolicy Zum Nachahmen des veralteten MirrorMaker soll LegacyReplicationPolicy verwendet werden.
heartbeats.topic.retention.ms ein Tag Wird beim ersten Erstellen von Heartbeatthemen verwendet.
checkpoints.topic.retention.ms ein Tag Wird beim ersten Erstellen von Prüfpunktthemen verwendet.
offset.syncs.topic.retention.ms max long Wird beim erstmaligen Erstellen eines Offsetsynchronisierungsthemas verwendet.
replication.factor two Wird beim Erstellen der Remotethemen verwendet.

Häufig gestellte Fragen

Warum ist nach der Replikation eines Themas ein Unterschied beim letzten Offset im Quell- und Zielcluster festzustellen?

Es ist möglich, dass Datenpunkte des Quellthemas gelöscht wurden und daher die tatsächliche Anzahl von Datensätzen geringer ist als der Wert beim letzten Offset. Dies führt zu dem Unterschied beim letzten Offset zwischen Quell- und Zielcluster nach der Replikation, da die Replikation immer bei „offset-0“ des Zielclusters beginnt.

Wie verhalten sich die Consumer bei der Migration, wenn der Zielcluster eine andere Offsetzuordnung zu Datenpunkten aufweist?

Das MirrorMaker 2.0 MirrorCheckpointConnector-Feature speichert automatisch Offsetprüfpunkte für Consumergruppen im Quellcluster. Jeder Prüfpunkt enthält eine Zuordnung des letzten bestätigten Offsets für jede Gruppe im Quellcluster zum entsprechenden Offset im Zielcluster. Bei der Migration können die Consumer, die mit der Nutzung aus demselben Thema im Zielcluster beginnen, den Empfang von Nachrichten aus dem letzten Offset fortsetzen, den sie im Quellcluster bestätigt haben.

Wie lässt sich der genaue Themenname im Zielcluster beibehalten, da allen replizierten Themen der Quellalias vorangestellt ist?

Dies ist das Standardverhalten in MirrorMaker 2.0, um ein Überschreiben von Daten in komplexen Spiegelungstopologien zu verhindern. Wenn dieses Verhalten angepasst werden soll, muss im Hinblick auf das Replikationsflussdesign und die Themenverwaltung sehr umsichtig vorgegangen werden, um Datenverluste zu vermeiden. Dafür kann eine benutzerdefinierte Replikationsrichtlinienklasse für „replication.policy.class“ verwendet werden.

Warum werden neue interne Themen angezeigt, die in meiner Kafka-Quell- und -Zielplattform erstellt wurden?

Die Connectors erstellen interne MirrorMaker 2.0-Themen, um den Replikationsprozess, die Überwachung, die Offsetzuordnung und die Prüfpunkterstellung nachverfolgen zu können.

Warum erstellt MirrorMaker nur zwei Replikate des Themas im Zielcluster, obwohl die Quelle mehr enthält?

MirrorMaker 2 repliziert den Replikationsfaktor von Themen nicht in Zielcluster. Dies lässt sich über die MM2-Konfiguration steuern, indem die erforderliche Anzahl für „replication.factor“ angegeben wird. Der Standardwert ist 2.

Wie wird die benutzerdefinierte Replikationsrichtlinie in MirrorMaker 2.0 verwendet?

Eine benutzerdefinierte Replikationsrichtlinie kann durch Implementieren der folgenden Schnittstelle erstellt werden.

/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {

    /** How to rename remote topics; generally should be like us-west.topic1. */
    String formatRemoteTopic(String sourceClusterAlias, String topic);
 
    /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
        Returns null if not a remote topic.
    */
    String topicSource(String topic);
 
    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
        Topics may be replicated multiple hops, so the immediately upstream topic
        may itself be a remote topic.
        Returns null if not a remote topic.
    */
    String upstreamTopic(String topic);
 
    /** The name of the original source-topic, which may have been replicated multiple hops.
        Returns the topic if it is not a remote topic.
    */
    String originalTopic(String topic);
 
    /** Internal topics are never replicated. */
    boolean isInternalTopic(String topic);
}

Die Implementierung muss dem Kafka-Klassenpfad für den Klassenverweis hinzugefügt werden, der für „replication.policy.class“ in den MM2-Eigenschaften verwendet werden soll.

Nächste Schritte

Was ist Apache Kafka in HDInsight?

Referenzen

Apache-Dokumentation: Änderungen bei MirrorMaker 2.0

Einrichten von Clientzertifikaten für HDI Kafka

HDInsight Kafka

Dokumentation zu Apache Kafka 2.4

Verbinden eines lokalen Netzwerks mit Azure