Intégrer la prise en charge Apache Kafka Connect à Azure Event Hubs

Apache Kafka Connect est une infrastructure permettant de connecter et d’importer/exporter des données depuis/vers n’importe quel système externe (p. ex. : MySQL), HDFS, et système de fichiers via un cluster Kafka. Ce tutoriel vous montre comment utiliser l’infrastructure Kafka Connect avec des Event Hubs.

Ce tutoriel vous guide tout au long de l’intégration de Kafka Connect avec un hub d’événements et du déploiement des connecteurs FileStreamSource et FileStreamSink de base. Bien que ces connecteurs ne soient pas destinés à une utilisation en production, ils présentent un scénario Kafka Connect de bout en bout où Azure Event Hubs agit comme un répartiteur Kafka.

Notes

Cet exemple est disponible sur GitHub.

Dans ce tutoriel, vous effectuez les étapes suivantes :

  • Créer un espace de noms Event Hubs
  • Cloner l’exemple de projet
  • Configurer Kafka Connect pour Event Hubs
  • Exécuter Kafka Connect
  • Créer des connecteurs

Prérequis

Pour suivre cette procédure pas à pas, vérifiez que les conditions préalables ci-dessous sont bien remplies :

Créer un espace de noms Event Hubs

Un espace de noms Event Hubs est requis pour échanger avec tout service Event Hubs. Pour obtenir des instructions sur la création d'un espace de noms et d'un Event Hub, consultez Créer un Event Hub. Obtenez la chaîne de connexion Event Hubs et le nom de domaine complet (FQDN) pour une utilisation ultérieure. Pour obtenir des instructions, consultez Obtenir une chaîne de connexion Event Hubs.

Cloner l’exemple de projet

Clonez le référentiel Azure Event Hubs et accédez au sous-dossier Tutoriels/Se connecter :

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurer Kafka Connect pour Event Hubs

Une reconfiguration minimale est nécessaire pour rediriger le débit Kafka Connect de Kafka vers Event Hubs. L’exemple connect-distributed.properties suivant montre comment configurer Connect pour s’authentifier et communiquer avec le point de terminaison Kafka sur Event Hubs :

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Important

Remplacez {YOUR.EVENTHUBS.CONNECTION.STRING} par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez Obtenir une chaîne de connexion Event Hubs. Voici un exemple de configuration : sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Exécuter Kafka Connect

Dans cette étape, un Worker Kafka Connect est démarré en local en mode distribué, à l’aide d’Event Hubs pour maintenir l’état du cluster.

  1. Enregistrez le fichier connect-distributed.properties ci-dessus en local. Veillez à remplacer toutes les valeurs entre accolades.
  2. Accédez à l’emplacement de la version de Kafka sur votre ordinateur.
  3. Exécutez ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. L’API REST du Worker Connect est prêt pour l’interaction lorsque vous voyez 'INFO Finished starting connectors and tasks'.

Notes

Kafka Connect utilise l’API Kafka AdminClient pour créer automatiquement des rubriques avec les configurations recommandées, y compris le compactage. Une vérification rapide de l’espace de noms dans le portail Azure révèle que les rubriques internes du Worker Connect ont été créées automatiquement.

Les rubriques internes Kafka Connect doivent utiliser le compactage. L’équipe Event Hubs n’est pas responsable de la résolution des configurations incorrectes si les rubriques Connect internes ne sont pas correctement configurées.

Créer des connecteurs

Cette section vous guide tout au long de l’ajout des connecteurs FileStreamSource et FileStreamSink.

  1. Créez un répertoire pour les fichiers de données d’entrée et de sortie.

    mkdir ~/connect-quickstart
    
  2. Créez deux fichiers : un avec les données de départ à partir duquel le connecteur FileStreamSource lit, et un autre dans lequel notre connecteur FileStreamSink écrit.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Créez un connecteur FileStreamSource. Veillez à remplacer les accolades par le chemin d’accès de votre répertoire de base.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Vous devriez voir l’Event Hub connect-quickstart sur votre instance Event Hubs après l’exécution de la commande ci-dessus.

  4. Vérifiez l’état du connecteur source.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Si vous le souhaitez, vous pouvez utiliser l’Explorateur Service Bus pour vérifier que les événements sont arrivés dans la rubrique connect-quickstart.

  5. Créez un connecteur FileStreamSink. Veillez de nouveau à remplacer les accolades par le chemin d’accès de votre répertoire de base.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Vérifiez l’état du connecteur récepteur.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Vérifiez que les données ont été répliquées entre les fichiers et que les données sont identiques entre les deux fichiers.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Nettoyage

Kafka Connect crée des rubriques Event Hubs pour stocker les configurations, les décalages et l’état qui persistent même après l’arrêt du cluster Connect. À moins que cette persistance ne soit souhaitée, il est recommandé de supprimer ces rubriques. Vous pouvez également supprimer les Event Hubs connect-quickstart qui ont été créés au cours de cette procédure pas à pas.

Étapes suivantes

Pour plus d’informations sur Event Hubs pour Kafka, consultez les articles suivants :