Partager via


Kafka Connect pour Azure Cosmos DB - Connecteur de récepteur

S’APPLIQUE À : NoSQL

Kafka Connect pour Azure Cosmos DB est un connecteur permettant de lire et d’écrire des données dans Azure Cosmos DB. Le connecteur du récepteur Azure Cosmos DB vous permet d’exporter des données de rubriques Apache Kafka vers une base de données Azure Cosmos DB. Le connecteur interroge les données de Kafka pour écrire dans les conteneurs de la base de données en fonction de l’abonnement aux rubriques.

Prérequis

  • Partez de la configuration de plateforme Confluent, qui offre un environnement complet. Si vous ne souhaitez pas utiliser la plateforme Confluent, vous devrez installer et configurer vous-même Apache Kafka et Kafka Connect. Vous devez également installer et configurer manuellement les connecteurs Azure Cosmos DB.
  • Créez un compte Azure Cosmos DB et un conteneur (guide de configuration)
  • Le shell Bash, testé sur GitHub Codespaces, Mac, Ubuntu et Windows avec WSL2. Cet interpréteur de commandes ne fonctionne pas dans Cloud Shell ou WSL1.
  • Télécharger Java 11 (ou version ultérieure)
  • Téléchargez Maven.

Installation du connecteur du récepteur

Si vous utilisez la configuration de plateforme Confluent recommandée, le connecteur du récepteur Azure Cosmos DB est inclus dans l’installation. Vous pouvez donc ignorer cette étape.

Sinon, vous pouvez télécharger le fichier JAR de la dernière Release ou créer un package à partir de ce référentiel pour créer un nouveau fichier JAR. Pour installer manuellement le connecteur à l’aide du fichier JAR, consultez ces instructions. Vous pouvez également empaqueter un nouveau fichier JAR à partir du code source.

# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar

Création d’une rubrique Kafka et écriture de données

Si vous utilisez la plateforme Confluent, le moyen le plus simple de créer une rubrique Kafka consiste à passer par l’expérience utilisateur du centre de contrôle fournie. Dans le cas contraire, vous pouvez créer manuellement une rubrique Kafka suivant cette syntaxe :

./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

Dans ce scénario, nous allons créer une rubrique Kafka nommée « hotels » et y écrire des données JSON sans schéma incorporé. Pour créer une rubrique dans le centre de contrôle, consultez le guide de Confluent.

Ensuite, lancez le producteur de la console Kafka pour écrire quelques enregistrements dans la rubrique « hotels ».

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

Dans le producteur de la console, entrez le code suivant :

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

Les trois enregistrements entrés sont publiés dans la rubrique Kafka « hotels » au format JSON.

Créer le connecteur de puits

Créez un connecteur de récepteur Azure Cosmos DB dans Kafka Connect. Le corps JSON suivant définit la configuration du connecteur du récepteur. Veillez à remplacer la valeur des propriétés connect.cosmos.connection.endpoint et connect.cosmos.master.key, que vous devez avoir enregistrées dans les prérequis du guide de configuration d’Azure Cosmos DB.

Pour plus d’informations sur chacune de ces propriétés de configuration, consultez les propriétés du 'sink'.

{
  "name": "cosmosdb-sink-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "tasks.max": "1",
    "topics": [
      "hotels"
    ],
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
    "connect.cosmos.master.key": "<cosmosdbprimarykey>",
    "connect.cosmos.databasename": "kafkaconnect",
    "connect.cosmos.containers.topicmap": "hotels#kafka"
  }
}

Une fois que toutes les valeurs sont remplies, enregistrez le fichier JSON en local. Vous pourrez l'utiliser pour créer le connecteur à l'aide de l'API REST.

Création d’un connecteur avec le centre de contrôle

Une option simple pour créer le connecteur consiste à passer par la page web du centre de contrôle. Suivez ce guide d’installation pour créer un connecteur à partir du centre de contrôle. Au lieu d’utiliser l’option DatagenConnector, utilisez la vignette CosmosDBSinkConnector. Quand vous configurez le connecteur du récepteur, renseignez les valeurs de la même manière que pour le fichier JSON que vous avez rempli.

Vous pouvez également, sur la page des connecteurs, charger le fichier JSON créé avec l’option Chargement du fichier de configuration du connecteur.

Capture d’écran de l’option Télécharger le fichier de configuration du connecteur dans la boîte de dialogue Parcourir les connecteurs.

Création d’un connecteur avec l’API REST

Créez le connecteur de collecteur à l'aide de l'API REST Connect.

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Vérification des données écrites dans Azure Cosmos DB

Connectez-vous au portail Azure et accédez à votre compte Azure Cosmos DB. Vérifiez que les trois enregistrements de la rubrique « hotels » sont créés dans votre compte.

Nettoyage

Pour supprimer le connecteur du Centre de contrôle, accédez au connecteur de destination que vous avez créé et sélectionnez l'icône Supprimer.

Capture d’écran de l’option de suppression dans la boîte de dialogue du connecteur de récepteur.

Vous pouvez également utiliser l’API REST Connect pour le supprimer :

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

Pour supprimer le service Azure Cosmos DB créé et son groupe de ressources avec Azure CLI, consultez cette procédure.

Propriétés de configuration du récepteur

Les paramètres suivants sont utilisés pour configurer un connecteur de réception Kafka pour Azure Cosmos DB. Ces valeurs de configuration déterminent dans quelles rubriques Kafka les données sont consommées, dans quels conteneurs Azure Cosmos DB elles sont écrites et quels formats sont utilisés pour les sérialiser. Pour voir un exemple de fichier de configuration comportant les valeurs par défaut, consultez cette configuration.

Nom Catégorie Descriptif Obligatoire ou facultatif
Rubriques liste Liste de rubriques Kafka à surveiller. Obligatoire
connector.class chaîne Nom de la classe du récepteur Azure Cosmos DB. Doit avoir la valeur com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector. Obligatoire
connect.cosmos.connection.endpoint URI Chaîne d’URI du point de terminaison Azure Cosmos DB. Obligatoire
connect.cosmos.master.key chaîne Clé primaire Azure Cosmos DB à laquelle le récepteur se connecte. Obligatoire
connect.cosmos.databasename chaîne Nom de la base de données Azure Cosmos DB dans laquelle le récepteur écrit. Obligatoire
connect.cosmos.containers.topicmap chaîne Mappage entre les rubriques Kafka et les conteneurs Azure Cosmos DB, au format CSV : topic#container,topic2#container2. Obligatoire
connect.cosmos.connection.gateway.enabled booléen Indicateur pour indiquer s’il faut utiliser le mode passerelle. La valeur par défaut est false. Facultatif
connect.cosmos.sink.bulk.enabled booléen Indicateur pour indiquer si le mode en bloc est activé. Par défaut, il l’est. Facultatif
connect.cosmos.sink.maxRetryCount entier Nombre maximal de nouvelles tentatives en cas d’échecs d’écriture temporaires. Par défaut, il est de 10 fois. Facultatif
key.converter chaîne Format de sérialisation des données clés écrites dans le sujet Kafka. Obligatoire
convertisseur de valeurs chaîne Format de sérialisation des données de valeur écrites dans la rubrique Kafka. Obligatoire
key.converter.schemas.enable chaîne Propriété définie sur « true » si les données de clé possèdent un schéma incorporé. Facultatif
value.converter.schemas.enable chaîne Propriété définie sur « true » si les données de clé possèdent un schéma incorporé. Facultatif
tasks.max entier Nombre maximal de tâches du récepteur du connecteur. La valeur par défaut est 1 Facultatif

Les données sont toujours écrites dans Azure Cosmos DB au format JSON sans aucun schéma.

Types de données pris en charge

Le connecteur du récepteur Azure Cosmos DB convertit l’enregistrement du récepteur en document JSON prenant en charge les types de schémas suivants :

Type de schéma Type de données JSON
Tableau Tableau
booléen booléen
Float32 Numéro
Float64 Numéro
Int8 Numéro
Int16 Numéro
Int32 Numéro
Int64 Numéro
Carte Objet (JSON)
Chaîne Chaîne
Zéro
Struct Objet (JSON)

Le connecteur du récepteur prend également en charge les types logiques Avro suivants :

Type de schéma Type de données JSON
Date (Jour/Mois/Année) Numéro
Heure Numéro
Timestamp Numéro

Remarque

La désérialisation d’octets n’est à l’heure actuelle pas prise en charge par le connecteur du récepteur Azure Cosmos DB.

Transformations de message unique (SMT)

En parallèle des paramètres du connecteur du récepteur, vous pouvez spécifier l’utilisation des transformations de message unique (SMT, Single Message Transform) pour modifier les messages qui transitent par la plateforme Kafka Connect. Pour plus d’informations, consultez la documentation Confluent SMT.

Utilisation de la transformation SMT InsertUUID

La fonction SMT InsertUUID permet d’ajouter automatiquement des identificateurs d’éléments. Avec la transformation SMT InsertUUID personnalisée, vous pouvez insérer le champ id avec une valeur UUID aléatoire pour chaque message, avant qu’il ne soit écrit dans Azure Cosmos DB.

Avertissement

N’utilisez cette transformation SMT que si les messages ne contiennent pas le champ id. Dans le cas contraire, les valeurs id sont remplacées ; vous risquez donc de vous retrouver avec des doublons d’éléments dans votre base de données. Il peut être rapide et facile d’utiliser des UUID comme identificateurs de messages, mais il ne s’agit pas d’une clé de partition idéale dans Azure Cosmos DB.

Installer le SMT

Pour pouvoir utiliser la transformation SMT InsertUUID, vous devez l’installer dans votre configuration de plateforme Confluent. Si vous vous servez de la configuration de plateforme Confluent de ce référentiel, la transformation est déjà incluse dans l’installation. Vous pouvez donc ignorer cette étape.

Vous pouvez également empaqueter la source InsertUUID pour créer un fichier JAR. Pour installer manuellement le connecteur à l’aide du fichier JAR, consultez ces instructions.

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

Configurer le SMT

Dans la configuration de votre connecteur de sink, ajoutez les propriétés suivantes pour définir id.

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

Pour plus d’informations sur l’utilisation de SMT, consultez le référentiel InsertUUID.

Utilisation de SMT pour configurer la durée de vie (TTL)

En utilisant à la fois les SMT InsertField et Cast, vous pouvez configurer la durée de vie (TTL, Time to Live) pour chaque élément créé dans Azure Cosmos DB. Activez la durée de vie sur le conteneur avant de l’activer au niveau de l’élément. Pour plus d’informations, consultez la documentation Durée de vie.

Dans la configuration du connecteur Sink, ajouter les propriétés suivantes pour définir le TTL en secondes. Dans l’exemple suivant, la durée de vie est définie sur 100 secondes. Si le message contient déjà le champ TTL, la valeur TTL est remplacée par ces transformations SMT.

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

Pour plus d’informations sur l’utilisation de ces SMT, consultez la documentation InsertField et Cast.

Résolution des problèmes courants

Voici des solutions à certains problèmes courants que vous pouvez rencontrer en utilisant le connecteur du récepteur Kafka.

Lecture de données qui ne sont pas au format JSON avec JsonConverter

Si votre rubrique source comprend des données qui ne sont pas au format JSON dans Kafka et que vous tentez de les lire avec JsonConverter, vous obtenez l’exception suivante :

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7

Cette erreur est probablement due à la sérialisation des données de la rubrique source au format Avro ou autre, par exemple une chaîne CSV.

Solution : si les données de la rubrique sont au format Avro, modifiez le connecteur du récepteur Kafka Connect de sorte qu’il utilise AvroConverter, comme indiqué ci-dessous.

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

Prise en charge du mode de passerelle

connect.cosmos.connection.gateway.enabled est une option de configuration pour le connecteur de réception Kafka de Cosmos DB qui optimise l'ingestion de données en recourant au service de passerelle de Cosmos DB. Ce service fait office de front-end pour Cosmos DB, offrant des avantages tels que l’équilibrage de charge, le routage des requêtes et la traduction de protocole. En tirant parti du service de passerelle, le connecteur améliore le débit et la scalabilité lors de l’écriture de données dans Cosmos DB. Pour plus d’informations, consultez les modes de connectivité.

"connect.cosmos.connection.gateway.enabled": true

Prise en charge du mode Rafale

La propriété connect.cosmos.sink.bulk.enabled détermine si la fonctionnalité d’écriture en bloc est activée pour l’écriture de données à partir de rubriques Kafka dans Azure Cosmos DB.

Lorsque cette propriété est définie sur true (par défaut), elle active le mode d’écriture en bloc, ce qui permet à Kafka Connect d’utiliser l’API d’importation en bloc d’Azure Cosmos DB pour effectuer des écritures par lots efficaces à l’aide de la méthode CosmosContainer.executeBulkOperations(). Le mode d’écriture en bloc améliore considérablement les performances d’écriture et réduit la latence globale lors de l’ingestion de données dans Cosmos DB par rapport au mode non-bloc lorsque la méthode CosmosContainer.upsertItem() est utilisée.

Le mode en bloc est activé par défaut. Pour désactiver la propriété connect.cosmos.sink.bulk.enabled, vous devez la définir sur false dans la configuration du connecteur récepteur Cosmos DB. Voici un exemple de fichier de propriété de configuration :

"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false

En activant la propriété connect.cosmos.sink.bulk.enabled, vous pouvez tirer parti de la fonctionnalité d’écriture en bloc dans Kafka Connect pour Azure Cosmos DB pour obtenir des performances d’écriture améliorées lors de la réplication de données à partir de rubriques Kafka vers Azure Cosmos DB.

"connect.cosmos.sink.bulk.enabled": true

Lecture de données qui ne sont pas au format Avro avec AvroConverter

Ce scénario s’applique lorsque vous essayez d’utiliser le convertisseur Avro pour lire des données d’une rubrique qui ne sont pas au format Avro. Sont incluses les données écrites par un sérialiseur Avro autre que celui de Confluent Schema Registry, qui possède son propre format filaire.

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Solution : vérifiez le format de sérialisation de la rubrique source. Ensuite, basculez le connecteur du récepteur Kafka Connect vers le convertisseur approprié ou remplacez le format en amont par Avro.

Lecture d’un message JSON sans la structure schéma/charge utile attendue

Kafka Connect prend en charge une structure spéciale de messages JSON contenant à la fois la charge utile et le schéma.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

Si vous tentez de lire des données JSON qui ne contiennent pas les données de cette structure, vous obtenez l’erreur suivante :

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

En termes clairs, la seule structure JSON valide pour schemas.enable=true possède pour éléments de niveau supérieur des champs de schéma et de charge utile (cf. ci-dessus). Comme l’indique le message d’erreur, si vous ne disposez que de données JSON brutes, modifiez ainsi la configuration de votre connecteur :

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

Limites

  • L’autocréation de bases de données et de conteneurs dans Azure Cosmos DB n’est pas prise en charge. La base de données et les conteneurs doivent déjà exister et être configurés correctement.

Étapes suivantes

Pour plus d’informations sur le flux de modification dans Azure Cosmo DB, consultez les documents suivants :

Vous pouvez en apprendre davantage sur les opérations en bloc dans le kit SDK Java V4 à l’aide des documents suivants :