Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
S’APPLIQUE À : NoSQL
Kafka Connect pour Azure Cosmos DB est un connecteur permettant de lire et écrire des données dans Azure Cosmos DB. Le connecteur source Azure Cosmos DB permet de lire les données du flux de modification Azure Cosmos DB et de les publier dans une rubrique Kafka.
Prérequis
- Partez de la configuration de plateforme Confluent, qui offre un environnement complet. Si vous ne souhaitez pas utiliser la plateforme Confluent, vous devez installer et configurer vous-même ZooKeeper, Apache Kafka et Kafka Connect. Vous devez également installer et configurer manuellement les connecteurs Azure Cosmos DB.
- Créez un compte Azure Cosmos DB et un conteneur (guide de configuration)
- Munissez-vous d'un interpréteur de commandes Bash, testé sur GitHub Codespaces, Mac, Ubuntu et Windows avec WSL2. Cet interpréteur de commandes ne fonctionne pas dans Cloud Shell ou WSL1.
- Télécharger Java 11 (ou version ultérieure)
- Télécharger Maven
Installer le connecteur source
Si vous utilisez la configuration de plateforme Confluent recommandée, le connecteur source Azure Cosmos DB est inclus dans l’installation. Vous pouvez donc ignorer cette étape.
Sinon, vous pouvez utiliser le fichier JAR de la dernière mise en production et installer le connecteur manuellement. Pour plus d'informations, consultez ces instructions. Vous pouvez également empaqueter un nouveau fichier JAR à partir du code source :
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*dependencies.jar
Création d’une rubrique Kafka
Créez une rubrique Kafka à l'aide de l'outil Confluent Control Center. Dans ce scénario, nous allons créer une rubrique Kafka nommée « apparels » et y écrire des données JSON sans schéma incorporé. Pour créer une rubrique dans l'outil Control Center, consultez la documentation de création d'une rubrique Kafka.
Créer le connecteur source
Créer le connecteur source dans Kafka Connect
Pour créer le connecteur source Azure Cosmos DB dans Kafka Connect, utilisez la configuration JSON suivante. Veillez à remplacer les valeurs d'espace réservé des propriétés connect.cosmos.connection.endpoint
et connect.cosmos.master.key
que vous avez dû enregistrer à partir du guide de configuration d'Azure Cosmos DB dans les conditions préalables.
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "apparels#kafka",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
Pour plus d'informations sur chacune de ces propriétés de configuration, consultez la section Propriétés de la source. Une fois toutes les valeurs renseignées, enregistrez le fichier JSON en local. Vous pourrez l'utiliser pour créer le connecteur à l'aide de l'API REST.
Créer un connecteur à l'aide de l'outil Control Center
Une option simple pour créer le connecteur consiste à utiliser le portail Confluent Control Center. Suivez le guide de configuration de Confluent pour créer un connecteur à partir de Control Center. Lors de la configuration, plutôt que d'utiliser l'option DatagenConnector
, utilisez la vignette CosmosDBSourceConnector
. Lors de la configuration du connecteur source, renseignez les valeurs de la même manière que dans le fichier JSON.
Sur la page des connecteurs, vous pouvez également charger le fichier JSON créé à la section précédente à l'aide de l'option Charger le fichier de configuration du connecteur.
Créer un connecteur à l'aide de l'API REST
Créez le connecteur source à l'aide de l'API REST Connect
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
Insérer un document dans Azure Cosmos DB
Connectez-vous au portail Azure et accédez à votre compte Azure Cosmos DB.
Ouvrez l'onglet Exploration des données et sélectionnez Bases de données
Ouvrez la base de données « kafkaconnect » et le conteneur « kafka » créés précédemment.
Pour créer un nouveau document JSON, dans le volet API pour NoSQL, développez le conteneur « kafka », sélectionnez Éléments, puis sélectionnez Nouvel élément dans la barre d'outils.
À présent, ajoutez un document au conteneur, avec la structure suivante. Collez l'exemple de code JSON suivant dans l'onglet Éléments, en écrasant le contenu existant :
{ "id": "2", "productId": "33218897", "category": "Women's Outerwear", "manufacturer": "Contoso", "description": "Black wool pea-coat", "price": "49.99", "shipping": { "weight": 2, "dimensions": { "width": 8, "height": 11, "depth": 3 } } }
Sélectionnez Enregistrer.
Vérifiez que le document a été enregistré en consultant les éléments du menu de gauche.
Vérifier les données écrites dans la rubrique Kafka
- Ouvrez l'interface utilisateur de la rubrique Kafka sur
http://localhost:9000
. - Sélectionnez la rubrique Kafka « apparels » que vous avez créée.
- Vérifiez que le document que vous avez précédemment inséré dans Azure Cosmos DB apparaît dans la rubrique Kafka.
Nettoyage
Pour supprimer le connecteur source que vous avez créé dans Confluent Control Center, accédez à celui-ci, puis sélectionnez l'icône Supprimer.
Vous pouvez également utiliser l'API REST du connecteur :
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector
Pour supprimer le service Azure Cosmos DB et son groupe de ressources à l'aide d'Azure CLI, consultez cette procédure.
Propriétés de la configuration de la source
Les paramètres suivants sont utilisés pour configurer le connecteur source Kafka. Ces valeurs de configuration déterminent quel conteneur Azure Cosmos DB est utilisé, les données à partir desquelles les rubriques Kafka sont écrites et les formats utilisés pour sérialiser les données. Pour accéder à un exemple comportant les valeurs par défaut, consultez ce fichier de configuration.
Nom | Catégorie | Descriptif | Obligatoire ou facultatif |
---|---|---|---|
connector.class | Chaîne | Nom de la classe de la source Azure Cosmos DB. Celui-ci doit être défini sur com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector |
Obligatoire |
connect.cosmos.databasename | Chaîne | Nom de la base de données à lire. | Obligatoire |
connect.cosmos.master.key | Chaîne | Clé primaire Azure Cosmos DB. | Obligatoire |
connect.cosmos.connection.endpoint | URI | Point de terminaison du compte. | Obligatoire |
connect.cosmos.containers.topicmap | Chaîne | Mappages entre rubrique et conteneur, séparés par des virgules. Par exemple, topic1#coll1, topic2#coll2 | Obligatoire |
connect.cosmos.connection.gateway.enabled | booléen | Indicateur pour indiquer s’il faut utiliser le mode passerelle. La valeur par défaut est false. | Facultatif |
connect.cosmos.messagekey.enabled | Booléen | Cette valeur indique si la clé de message Kafka doit être définie. La valeur par défaut est true |
Obligatoire |
connect.cosmos.messagekey.field | Chaîne | Utilisez la valeur du champ du document comme clé de message. La valeur par défaut est id . |
Obligatoire |
connect.cosmos.offset.useLatest | Booléen | Définissez cette propriété sur true pour utiliser le décalage source le plus récent. Définissez-la sur false pour utiliser le décalage enregistré le plus ancien. La valeur par défaut est false . |
Obligatoire |
connect.cosmos.task.poll.interval | Int | Intervalle d'interrogation du conteneur du flux de modification. | Obligatoire |
key.converter | Chaîne | Format de sérialisation des données de clé écrites dans la rubrique Kafka. | Obligatoire |
convertisseur de valeurs | Chaîne | Format de sérialisation des données de valeur écrites dans la rubrique Kafka. | Obligatoire |
key.converter.schemas.enable | Chaîne | Définissez cette propriété sur true si les données de clé possèdent un schéma incorporé. |
Facultatif |
value.converter.schemas.enable | Chaîne | Définissez cette propriété sur true si les données de clé possèdent un schéma incorporé. |
Facultatif |
tasks.max | Int | Nombre maximum de tâches sources de connecteurs. La valeur par défaut est 1 . |
Facultatif |
Types de données pris en charge
Le connecteur source Azure Cosmos DB convertit le document JSON en schéma et prend en charge les types de données JSON suivants :
Type de données JSON | Type de schéma |
---|---|
Array | Array |
Booléen | Booléen |
Numéro | Float32 Float64 Int8 Int16 Int32 Int64 |
Zéro | Chaîne |
Objet (JSON) | Structure |
Chaîne | Chaîne |
Étapes suivantes
- Kafka Connect pour Azure Cosmos DB connecteur récepteur