Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
MirrorMaker 2.0 (MM2) est conçu pour faciliter la mise en miroir ou la réplication de rubriques d’un cluster Kafka vers un autre. Il utilise l’infrastructure Kafka Connecter pour simplifier la configuration et la mise à l’échelle. Il détecte dynamiquement les modifications apportées aux rubriques et garantit que les propriétés de rubrique source et cible sont synchronisées, notamment les décalages et les partitions.
Dans cet article, vous apprendrez comment utiliser Kafka MirrorMaker 2.0 dans la migration/réplication de données et les cas d'utilisation
Prérequis
- Environnement avec au moins deux clusters Kafka HDI.
- Version Kafka supérieure à 2.4 (HDI 4.0)
- Le cluster source doit avoir des points de données et des rubriques pour tester différentes fonctionnalités du processus de réplication MirrorMaker 2.0
Cas d’utilisation
Simulation de MirrorMaker 2.0 pour répliquer des points de données/décalages entre deux clusters Kafka dans HDInsight. La même solution peut être utilisée pour les scénarios tels que la réplication de données requise entre deux clusters Kafka tels que la récupération d’urgence, l’adaptation cloud, la géoréplication, l’isolation des données et l’agrégation de données.
Décalage de la réplication avec MirrorMaker 2.0
Internes MM2
L’outil MirrorMaker 2.0 est composé de différents connecteurs. Ces connecteurs sont des connecteurs Kafka Connecter standard, qui peuvent être utilisés directement avec Kafka Connecter en mode autonome ou distribué.
Le récapitulatif du processus de configuration du répartiteur est le suivant :
MirrorSourceConnector :
- Réplique les rubriques distantes, les configurations et les listes de contrôle d’accès de rubriques d’un seul cluster source.
- Émet des synchronisations de décalage vers une rubrique interne.
MirrorSinkConnector :
- Consomme à partir du cluster principal et réplique des rubriques vers un seul cluster cible.
MirrorCheckpointConnector :
- Consomme offset-syncs.
- Émet des points de contrôle pour activer les points de basculement.
MirrorHeartBeatConnector :
- Émet des pulsations vers des clusters distants, ce qui permet de surveiller le processus de réplication.
Déploiement
Connecter-mirror-maker.sh script groupé avec la bibliothèque Kafka implémente un cluster MM2 distribué, qui gère les Connecter workers en interne en fonction d’un fichier de configuration. Le pilote MirrorMaker crée et gère en interne des paires de chaque connecteur : MirrorSourceConnector, MirrorSinkConnector, connecteur MirrorCheckpoint et MirrorHeartbeatConnector.
Lancement de MirrorMaker 2.0.
./bin/connect-mirror-maker.sh ./config/mirror-maker.properties
Notes
Pour les clusters Kerberos activés, la configuration JAAS doit être exportée vers le KAFKA_OPTS ou doit être spécifiée dans le fichier de configuration MM2.
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"
Exemple de fichier de configuration MirrorMaker 2.0
# specify any number of cluster aliases
clusters = source, destination
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
# enable and configure individual replication flows
source->destination.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"
# Setting replication factor of newly created remote topics
replication.factor=3
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
Configuration SSL
Si le programme d’installation nécessite une configuration SSL
destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password>
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI
Configurations globales
Propriété | Valeur par défaut | Description |
---|---|---|
name | obligatoire | nom du connecteur, par exemple, « us-west->us-east » |
topics | chaîne vide | regex des rubriques à répliquer, par exemple « topic1, topic2, topic3 ». Les listes séparées par des virgules sont également prises en charge. |
topics.blacklist | ". .internal, .. réplica, __consumer_offsets » ou similaire | rubriques à exclure de la réplication |
groups | chaîne vide | regex des groupes à répliquer, par exemple,» .* » |
groups.blacklist | chaîne vide | groupes à exclure de la réplication |
source.cluster.alias | obligatoire | nom du cluster en cours de réplication |
target.cluster.alias | obligatoire | nom du cluster Kafka en aval |
source.cluster.bootstrap.servers | obligatoire | cluster en amont à répliquer |
target.cluster.bootstrap.servers | obligatoire | cluster en aval |
sync.topic.configs.enabled | true | s’il faut surveiller ou non le cluster source pour les modifications de configuration |
sync.topic.acls.enabled | true | s’il faut surveiller les listes de contrôle d’accès de cluster source pour les modifications |
emit.heartbeats.enabled | true | le connecteur doit émettre régulièrement des pulsations |
emit.heartbeats.interval.seconds | true | fréquence des pulsations |
emit.checkpoints.enabled | true | le connecteur doit émettre régulièrement des informations de décalage du consommateur |
emit.checkpoints.interval.seconds | 5 (secondes) | fréquence des points de contrôle |
refresh.topics.enabled | true | le connecteur doit vérifier régulièrement les nouveaux groupes de consommateurs |
refresh.topics.interval.seconds | 5 (secondes) | fréquence pour vérifier le cluster source pour les nouveaux groupes de consommateurs |
refresh.groups.enabled | true | le connecteur doit vérifier régulièrement les nouveaux groupes de consommateurs |
refresh.groups.interval.seconds | 5 (secondes) | fréquence pour vérifier le cluster source pour les nouveaux groupes de consommateurs |
readahead.queue.capacity | 500 (enregistrements) | nombre d’enregistrements pour permettre au consommateur d’avancer le producteur |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | utiliser LegacyReplicationPolicy pour imiter l’hérité MirrorMaker |
heartbeats.topic.retention.ms | un jour | utilisé lors de la création de rubriques de pulsation pour la première fois |
checkpoints.topic.retention.ms | un jour | utilisé lors de la création de rubriques de point de contrôle pour la première fois |
offset.syncs.topic.retention.ms | max long | utilisé lors de la création d’une rubrique de synchronisation de décalage pour la première fois |
replication.factor | two | utilisé lors de la création des rubriques distantes |
Forum aux questions
Pourquoi voyons-nous une différence dans le dernier décalage sur le cluster source et de destination après la réplication d’une rubrique ?
Il est possible que les points de données de la rubrique source aient été vidés en raison desquels le nombre d’enregistrements réels serait inférieur à la dernière valeur de décalage. Cela entraîne la différence entre le dernier décalage sur la réplication du cluster source et de destination, car la réplication commence toujours par offset-0 du cluster de destination.
Comment les consommateurs se comportent-ils sur la migration, si le cluster de destination peut avoir un mappage de décalage différent vers des points de données ?
La fonctionnalité MirrorMaker 2.0 MirrorCheckpointConnector stocke automatiquement les points de contrôle de décalage de groupe de consommateurs pour les groupes de consommateurs sur le cluster source. Chaque point de contrôle contient un mappage du dernier décalage validé pour chaque groupe du cluster source vers le décalage équivalent dans le cluster de destination. Ainsi, sur la migration, les consommateurs qui commencent à consommer à partir de la même rubrique sur le cluster de destination pourront reprendre la réception de messages à partir du dernier décalage qu’ils ont validé sur le cluster source.
Comment conserver le nom exact de la rubrique dans le cluster de destination, car l’alias source est précédé de toutes les rubriques répliquées ?
Il s’agit du comportement par défaut dans MirrorMaker 2.0 pour éviter la substitution de données dans des topologies de mise en miroir complexes. La personnalisation de cette opération doit être effectuée avec soin en termes de conception de flux de réplication et de gestion des rubriques pour éviter toute perte de données. Cela peut être effectué à l’aide d’une classe de stratégie de réplication personnalisée par rapport à « replication.policy.class ».
Pourquoi voyons-nous de nouvelles rubriques internes créées dans ma source et destination Kafka ?
Les rubriques internes de MirrorMaker 2.0 sont créées par les connecteurs pour suivre le processus de réplication, la surveillance, le mappage de décalage et le point de contrôle.
Pourquoi MirrorMaker crée-t-il uniquement deux réplicas de la rubrique dans le cluster de destination alors que la source a plus ?
MirrorMaker 2 ne réplique pas le facteur de réplication des rubriques sur des clusters cibles. Cela peut être contrôlé à partir de la configuration MM2, en spécifiant le nombre requis de « replication.factor ». La valeur par défaut pour la même valeur est deux.
Comment utiliser une stratégie de réplication personnalisée dans MirrorMaker 2.0 ?
Vous pouvez créer une stratégie de réplication personnalisée en implémentant l’interface ci-dessous.
/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {
/** How to rename remote topics; generally should be like us-west.topic1. */
String formatRemoteTopic(String sourceClusterAlias, String topic);
/** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
Returns null if not a remote topic.
*/
String topicSource(String topic);
/** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
Topics may be replicated multiple hops, so the immediately upstream topic
may itself be a remote topic.
Returns null if not a remote topic.
*/
String upstreamTopic(String topic);
/** The name of the original source-topic, which may have been replicated multiple hops.
Returns the topic if it is not a remote topic.
*/
String originalTopic(String topic);
/** Internal topics are never replicated. */
boolean isInternalTopic(String topic);
}
L’implémentation doit être ajoutée au chemin de classe Kafka pour que la référence de classe soit utilisée sur replication.policy.class dans les propriétés MM2.
Étapes suivantes
Présentation d’Apache Kafka sur HDInsight
Références
MirrorMaker 2.0 modifie Apache Doc
Configuration des certificats clients pour HDI Kafka