Partager via


Comment utiliser Kafka MirrorMaker 2.0 dans la migration des données, la réplication et les cas d’usage.

MirrorMaker 2.0 (MM2) est conçu pour faciliter la mise en miroir ou la réplication de rubriques d’un cluster Kafka vers un autre. Il utilise l’infrastructure Kafka Connecter pour simplifier la configuration et la mise à l’échelle. Il détecte dynamiquement les modifications apportées aux rubriques et garantit que les propriétés de rubrique source et cible sont synchronisées, notamment les décalages et les partitions.

Dans cet article, vous apprendrez comment utiliser Kafka MirrorMaker 2.0 dans la migration/réplication de données et les cas d'utilisation

Prérequis

  • Environnement avec au moins deux clusters Kafka HDI.
  • Version Kafka supérieure à 2.4 (HDI 4.0)
  • Le cluster source doit avoir des points de données et des rubriques pour tester différentes fonctionnalités du processus de réplication MirrorMaker 2.0

Cas d’utilisation

Simulation de MirrorMaker 2.0 pour répliquer des points de données/décalages entre deux clusters Kafka dans HDInsight. La même solution peut être utilisée pour les scénarios tels que la réplication de données requise entre deux clusters Kafka tels que la récupération d’urgence, l’adaptation cloud, la géoréplication, l’isolation des données et l’agrégation de données.

Décalage de la réplication avec MirrorMaker 2.0

Internes MM2

L’outil MirrorMaker 2.0 est composé de différents connecteurs. Ces connecteurs sont des connecteurs Kafka Connecter standard, qui peuvent être utilisés directement avec Kafka Connecter en mode autonome ou distribué.

Le récapitulatif du processus de configuration du répartiteur est le suivant :

MirrorSourceConnector :

  1. Réplique les rubriques distantes, les configurations et les listes de contrôle d’accès de rubriques d’un seul cluster source.
  2. Émet des synchronisations de décalage vers une rubrique interne.

MirrorSinkConnector :

  1. Consomme à partir du cluster principal et réplique des rubriques vers un seul cluster cible.

MirrorCheckpointConnector :

  1. Consomme offset-syncs.
  2. Émet des points de contrôle pour activer les points de basculement.

MirrorHeartBeatConnector :

  1. Émet des pulsations vers des clusters distants, ce qui permet de surveiller le processus de réplication.

Déploiement

  1. Connecter-mirror-maker.sh script groupé avec la bibliothèque Kafka implémente un cluster MM2 distribué, qui gère les Connecter workers en interne en fonction d’un fichier de configuration. Le pilote MirrorMaker crée et gère en interne des paires de chaque connecteur : MirrorSourceConnector, MirrorSinkConnector, connecteur MirrorCheckpoint et MirrorHeartbeatConnector.

  2. Lancement de MirrorMaker 2.0.

    ./bin/connect-mirror-maker.sh ./config/mirror-maker.properties    
    

Notes

Pour les clusters Kerberos activés, la configuration JAAS doit être exportée vers le KAFKA_OPTS ou doit être spécifiée dans le fichier de configuration MM2.

export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"    

Exemple de fichier de configuration MirrorMaker 2.0

# specify any number of cluster aliases
clusters = source, destination

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092"  and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092

# enable and configure individual replication flows
source->destination.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"

# Setting replication factor of newly created remote topics
replication.factor=3

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1    

Configuration SSL

Si le programme d’installation nécessite une configuration SSL

destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password> 
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI

Configurations globales

Propriété Valeur par défaut Description
name obligatoire nom du connecteur, par exemple, « us-west->us-east »
topics chaîne vide regex des rubriques à répliquer, par exemple « topic1, topic2, topic3 ». Les listes séparées par des virgules sont également prises en charge.
topics.blacklist ". .internal, .. réplica, __consumer_offsets » ou similaire rubriques à exclure de la réplication
groups chaîne vide regex des groupes à répliquer, par exemple,» .* »
groups.blacklist chaîne vide groupes à exclure de la réplication
source.cluster.alias obligatoire nom du cluster en cours de réplication
target.cluster.alias obligatoire nom du cluster Kafka en aval
source.cluster.bootstrap.servers obligatoire cluster en amont à répliquer
target.cluster.bootstrap.servers obligatoire cluster en aval
sync.topic.configs.enabled true s’il faut surveiller ou non le cluster source pour les modifications de configuration
sync.topic.acls.enabled true s’il faut surveiller les listes de contrôle d’accès de cluster source pour les modifications
emit.heartbeats.enabled true le connecteur doit émettre régulièrement des pulsations
emit.heartbeats.interval.seconds true fréquence des pulsations
emit.checkpoints.enabled true le connecteur doit émettre régulièrement des informations de décalage du consommateur
emit.checkpoints.interval.seconds 5 (secondes) fréquence des points de contrôle
refresh.topics.enabled true le connecteur doit vérifier régulièrement les nouveaux groupes de consommateurs
refresh.topics.interval.seconds 5 (secondes) fréquence pour vérifier le cluster source pour les nouveaux groupes de consommateurs
refresh.groups.enabled true le connecteur doit vérifier régulièrement les nouveaux groupes de consommateurs
refresh.groups.interval.seconds 5 (secondes) fréquence pour vérifier le cluster source pour les nouveaux groupes de consommateurs
readahead.queue.capacity 500 (enregistrements) nombre d’enregistrements pour permettre au consommateur d’avancer le producteur
replication.policy.class org.apache.kafka.connect.mirror.DefaultReplicationPolicy utiliser LegacyReplicationPolicy pour imiter l’hérité MirrorMaker
heartbeats.topic.retention.ms un jour utilisé lors de la création de rubriques de pulsation pour la première fois
checkpoints.topic.retention.ms un jour utilisé lors de la création de rubriques de point de contrôle pour la première fois
offset.syncs.topic.retention.ms max long utilisé lors de la création d’une rubrique de synchronisation de décalage pour la première fois
replication.factor two utilisé lors de la création des rubriques distantes

Forum aux questions

Pourquoi voyons-nous une différence dans le dernier décalage sur le cluster source et de destination après la réplication d’une rubrique ?

Il est possible que les points de données de la rubrique source aient été vidés en raison desquels le nombre d’enregistrements réels serait inférieur à la dernière valeur de décalage. Cela entraîne la différence entre le dernier décalage sur la réplication du cluster source et de destination, car la réplication commence toujours par offset-0 du cluster de destination.

Comment les consommateurs se comportent-ils sur la migration, si le cluster de destination peut avoir un mappage de décalage différent vers des points de données ?

La fonctionnalité MirrorMaker 2.0 MirrorCheckpointConnector stocke automatiquement les points de contrôle de décalage de groupe de consommateurs pour les groupes de consommateurs sur le cluster source. Chaque point de contrôle contient un mappage du dernier décalage validé pour chaque groupe du cluster source vers le décalage équivalent dans le cluster de destination. Ainsi, sur la migration, les consommateurs qui commencent à consommer à partir de la même rubrique sur le cluster de destination pourront reprendre la réception de messages à partir du dernier décalage qu’ils ont validé sur le cluster source.

Comment conserver le nom exact de la rubrique dans le cluster de destination, car l’alias source est précédé de toutes les rubriques répliquées ?

Il s’agit du comportement par défaut dans MirrorMaker 2.0 pour éviter la substitution de données dans des topologies de mise en miroir complexes. La personnalisation de cette opération doit être effectuée avec soin en termes de conception de flux de réplication et de gestion des rubriques pour éviter toute perte de données. Cela peut être effectué à l’aide d’une classe de stratégie de réplication personnalisée par rapport à « replication.policy.class ».

Pourquoi voyons-nous de nouvelles rubriques internes créées dans ma source et destination Kafka ?

Les rubriques internes de MirrorMaker 2.0 sont créées par les connecteurs pour suivre le processus de réplication, la surveillance, le mappage de décalage et le point de contrôle.

Pourquoi MirrorMaker crée-t-il uniquement deux réplicas de la rubrique dans le cluster de destination alors que la source a plus ?

MirrorMaker 2 ne réplique pas le facteur de réplication des rubriques sur des clusters cibles. Cela peut être contrôlé à partir de la configuration MM2, en spécifiant le nombre requis de « replication.factor ». La valeur par défaut pour la même valeur est deux.

Comment utiliser une stratégie de réplication personnalisée dans MirrorMaker 2.0 ?

Vous pouvez créer une stratégie de réplication personnalisée en implémentant l’interface ci-dessous.

/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {

    /** How to rename remote topics; generally should be like us-west.topic1. */
    String formatRemoteTopic(String sourceClusterAlias, String topic);
 
    /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
        Returns null if not a remote topic.
    */
    String topicSource(String topic);
 
    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
        Topics may be replicated multiple hops, so the immediately upstream topic
        may itself be a remote topic.
        Returns null if not a remote topic.
    */
    String upstreamTopic(String topic);
 
    /** The name of the original source-topic, which may have been replicated multiple hops.
        Returns the topic if it is not a remote topic.
    */
    String originalTopic(String topic);
 
    /** Internal topics are never replicated. */
    boolean isInternalTopic(String topic);
}

L’implémentation doit être ajoutée au chemin de classe Kafka pour que la référence de classe soit utilisée sur replication.policy.class dans les propriétés MM2.

Étapes suivantes

Présentation d’Apache Kafka sur HDInsight

Références

MirrorMaker 2.0 modifie Apache Doc

Configuration des certificats clients pour HDI Kafka

HDInsight Kafka

Documentation Apache Kafka 2.4

Connecter un réseau local à Azure