Partager via


Ingérer des données d’Azure Cosmos DB dans Azure Data Explorer

Azure Data Explorer prend en charge l’ingestion de données à partir d’Azure Cosmos DB for NoSql à l’aide d’un flux de modification. La connexion de données de flux de modification Cosmos DB est un pipeline d’ingestion qui écoute votre flux de modification Cosmos DB et ingère les données dans votre table Data Explorer. Le flux de modification écoute les documents nouveaux et mis à jour, mais ne journalise pas les suppressions. Pour obtenir des informations générales sur l’ingestion de données dans Azure Data Explorer, consultez Vue d’ensemble de l’ingestion des données dans Azure Data Explorer.

Chaque connexion de données écoute un conteneur Cosmos DB spécifique et ingère des données dans une table spécifiée (plusieurs connexions peuvent ingérer dans une table unique). La méthode d’ingestion prend en charge l’ingestion en streaming (lorsqu’elle est activée) et l’ingestion en file d’attente.

Dans cet article, vous allez apprendre à configurer une connexion de données de flux de modification Cosmos DB pour ingérer des données dans Azure Data Explorer avec l’identité managée du système. Passez en revue les considérations avant de commencer.

Pour configurer un connecteur, procédez comme suit :

Étape 1 : Choisir une table Azure Data Explorer et configurer son mappage de table

Étape 2 : Créer une connexion de données Cosmos DB

Étape 3 : Tester la connexion de données

Prérequis

Étape 1 : Choisir une table Azure Data Explorer et configurer son mappage de table

Avant de créer une connexion de données, créez une table dans laquelle vous allez stocker les données ingérées et appliquer un mappage qui correspond au schéma dans le conteneur Cosmos DB source. Si votre scénario nécessite plus qu’un simple mappage de champs, vous pouvez utiliser des stratégies de mise à jour pour transformer et mapper les données ingérées à partir de votre flux de modification.

Voici un exemple de schéma d’un élément dans le conteneur Cosmos DB :

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

Procédez comme suit pour créer une table et appliquer un mappage de table :

  1. Dans l’interface utilisateur web Azure Data Explorer, dans le menu de navigation de gauche, sélectionnez Requête, puis sélectionnez la base de données dans laquelle vous souhaitez créer la table.

  2. Exécutez la commande suivante pour créer une table appelée TestTable.

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. Exécutez la commande suivante pour créer le mappage de tables.

    La commande mappe les propriétés personnalisées d’un document JSON Cosmos DB aux colonnes de la table TestTable , comme suit :

    Propriété Cosmos DB Colonne de tableau Transformation
    id Id None
    name Nom None
    _Ts _ts None
    _Ts _Timestamp Utilise DateTimeFromUnixSeconds pour transformer_ts (secondes UNIX) en _timestamp (datetime))

    Notes

    Nous vous recommandons d’utiliser les colonnes d’horodatage suivantes :

    • _ts : utilisez cette colonne pour rapprocher les données avec Cosmos DB.
    • _timestamp : utilisez cette colonne pour exécuter des filtres de temps efficaces dans vos requêtes Kusto. Pour plus d’informations, consultez Meilleures pratiques en matière de requête.
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

Transformer et mapper des données avec des stratégies de mise à jour

Si votre scénario nécessite plus qu’un simple mappage de champs, vous pouvez utiliser des stratégies de mise à jour pour transformer et mapper les données ingérées à partir de votre flux de modification.

Les stratégies de mise à jour sont un moyen de transformer des données à mesure qu’elles sont ingérées dans votre table. Ils sont écrits en Langage de requête Kusto et sont exécutés sur le pipeline d’ingestion. Ils peuvent être utilisés pour transformer des données à partir d’une ingestion de flux de modification Cosmos DB, comme dans les scénarios suivants :

  • Vos documents contiennent des tableaux qui seraient plus faciles à interroger s’ils sont transformés en plusieurs lignes à l’aide de l’opérateur mv-expand .
  • Vous souhaitez filtrer les documents. Par exemple, vous pouvez filtrer les documents par type à l’aide de l’opérateur where .
  • Vous disposez d’une logique complexe qui ne peut pas être représentée dans un mappage de table.

Pour plus d’informations sur la création et la gestion des stratégies de mise à jour, consultez Vue d’ensemble des stratégies de mise à jour.

Étape 2 : Créer une connexion de données Cosmos DB

Vous pouvez utiliser les méthodes suivantes pour créer le connecteur de données :

  1. Dans le Portail Azure, accédez à la page vue d’ensemble de votre cluster, puis sélectionnez l’onglet Prise en main.

  2. Dans la vignette Ingestion de données, sélectionnez Créer une connexion de> donnéesCosmos DB.

    Capture d’écran de l’onglet Prise en main, montrant l’option Créer une connexion de données Cosmos DB.

  3. Dans le volet Créer une connexion de données Cosmos DB, remplissez le formulaire avec les informations du tableau :

    Capture d’écran du volet de connexion de données, montrant les champs de formulaire avec des valeurs.

    Champ Description
    Nom de la base de données Choisissez la base de données Azure Data Explorer dans laquelle vous souhaitez ingérer des données.
    Nom de la connexion de données Spécifiez un nom pour la connexion de données.
    Abonnement Sélectionnez l’abonnement qui contient votre compte Cosmos DB NoSQL.
    Compte Cosmos DB Choisissez le compte Cosmos DB à partir duquel vous souhaitez ingérer des données.
    Base de données SQL Choisissez la base de données Cosmos DB à partir de laquelle vous souhaitez ingérer des données.
    Conteneur SQL Choisissez le conteneur Cosmos DB à partir duquel vous souhaitez ingérer des données.
    Nom de la table Spécifiez le nom de la table Azure Data Explorer dans laquelle vous souhaitez ingérer des données.
    Nom du mappage Si vous le souhaitez, spécifiez le nom de mappage à utiliser pour la connexion de données.
  4. Si vous le souhaitez, sous la section Paramètres avancés , procédez comme suit :

    1. Spécifiez la date de début de la récupération des événements. Il s’agit de l’heure à partir de laquelle le connecteur commencera à ingérer des données. Si vous ne spécifiez pas d’heure, le connecteur commencera à ingérer les données à partir du moment où vous créez la connexion de données. Le format de date recommandé est la norme UTC ISO 8601, spécifiée comme suit : yyyy-MM-ddTHH:mm:ss.fffffffZ.

    2. Sélectionnez Affecté par l’utilisateur , puis sélectionnez l’identité. Par défaut, l’identité managée affectée par le système est utilisée par la connexion. Si nécessaire, vous pouvez utiliser une identité affectée par l’utilisateur .

      Capture d’écran du volet de connexion de données, montrant les paramètres Avancés.

  5. Sélectionnez Créer pour créer la connexion de données.

Étape 3 : Tester la connexion de données

  1. Dans le conteneur Cosmos DB, insérez le document suivant :

    {
        "name":"Cousteau"
    }
    
  2. Dans l’interface utilisateur web Azure Data Explorer, exécutez la requête suivante :

    TestTable
    

    Le jeu de résultats doit ressembler à l’image suivante :

    Capture d’écran du volet de résultats, montrant le document ingéré.

Notes

Azure Data Explorer dispose d’une stratégie d’agrégation (traitement par lots) pour l’ingestion de données en file d’attente conçue pour optimiser le processus d’ingestion. La stratégie de traitement par lots par défaut est configurée pour sceller un lot une fois que l’une des conditions suivantes est remplie pour le lot : un délai maximal de 5 minutes, une taille totale d’un Go ou 1 000 objets blob. Il existe donc un risque de latence. Pour plus d’informations, consultez la stratégie de traitement par lot. Pour réduire la latence, configurez votre table pour prendre en charge la diffusion en continu. Consultez la stratégie de diffusion en continu.

Considérations

Les considérations suivantes s’appliquent au flux de modification Cosmos DB :

  • Le flux de modification n’expose pas les événements de suppression .

    Le flux de modification Cosmos DB inclut uniquement les documents nouveaux et mis à jour. Si vous avez besoin de connaître les documents supprimés, vous pouvez configurer votre flux à l’aide d’un marqueur réversible pour marquer un document Cosmos DB comme supprimé. Une propriété est ajoutée pour mettre à jour les événements qui indiquent si un document a été supprimé. Vous pouvez ensuite utiliser l’opérateur where dans vos requêtes pour les filtrer.

    Par exemple, si vous mappez la propriété supprimée à une colonne de table appelée IsDeleted, vous pouvez filtrer les documents supprimés avec la requête suivante :

    TestTable
    | where not(IsDeleted)
    
  • Le flux de modification expose uniquement la dernière mise à jour d’un document.

    Pour comprendre la conséquence de la deuxième considération, examinez le scénario suivant :

    Un conteneur Cosmos DB contient les documents A et B. Les modifications apportées à une propriété appelée foo sont indiquées dans le tableau suivant :

    ID du document Foo de propriété Événement Horodatage de document (_ts)
    A Rouge Création 10
    B Bleu Création 20
    A Orange Update 30
    A Pink Mettre à jour 40
    B Violette Update 50
    A Carmine Update 50
    B NeonBlue Update 70

    L’API de flux de modification est interrogée par le connecteur de données à intervalles réguliers, généralement toutes les quelques secondes. Chaque sondage contient les modifications qui se sont produites dans le conteneur entre les appels, mais uniquement la dernière version des modifications par document.

    Pour illustrer le problème, envisagez une séquence d’appels d’API avec les horodatages 15, 35, 55 et 75 , comme indiqué dans le tableau suivant :

    Horodatage des appels d’API ID du document Foo de propriété Horodatage de document (_ts)
    15 A Rouge 10
    35 B Bleu 20
    35 A Orange 30
    55 B Violet 50
    55 A Carmine 60
    75 B NeonBlue 70

    En comparant les résultats de l’API à la liste des modifications apportées dans le document Cosmos DB, vous remarquerez qu’ils ne correspondent pas. L’événement de mise à jour du document A, mis en surbrillance dans la table de modifications à l’horodatage 40, n’apparaît pas dans les résultats de l’appel d’API.

    Pour comprendre pourquoi l’événement n’apparaît pas, nous allons examiner les modifications apportées au document A entre les appels d’API aux horodatages 35 et 55. Entre ces deux appels, le document A a changé deux fois, comme suit :

    ID du document Propriété foo Événement Horodatage du document (_ts)
    A Pink Update 40
    A Carmine Update 50

    Lorsque l’appel d’API à l’horodatage 55 est effectué, l’API de flux de modification retourne la dernière version du document. Dans ce cas, la dernière version du document A est la mise à jour à l’horodatage 50, qui est la mise à jour de la propriété foo de Pink à Carmine.

    En raison de ce scénario, le connecteur de données peut manquer certaines modifications de document intermédiaires. Par exemple, certains événements peuvent être manqués si le service de connexion de données est arrêté pendant quelques minutes ou si la fréquence des modifications de document est supérieure à la fréquence d’interrogation de l’API. Toutefois, l’état le plus récent de chaque document est capturé.

  • La suppression et la recréation d’un conteneur Cosmos DB ne sont pas prises en charge

    Azure Data Explorer effectue le suivi du flux de modification en pointant la « position » à laquelle il se trouve dans le flux. Cette opération est effectuée à l’aide d’un jeton de continuation sur chaque partition physique du conteneur. Lorsqu’un conteneur est supprimé/recréé, ces jetons de continuation ne sont pas valides et ne sont pas réinitialisés : vous devez supprimer et recréer la connexion de données.

Estimer les coûts

Quel est l’impact de l’utilisation de la connexion de données Cosmos DB sur l’utilisation des unités de requête (RU) de votre conteneur Cosmos DB ?

Le connecteur appelle l’API de flux de modification Cosmos DB sur chaque partition physique de votre conteneur, jusqu’à une fois par seconde. Les coûts suivants sont associés à ces appels :

Coût Description
Coûts fixes Les coûts fixes sont d’environ 2 RU par partition physique chaque seconde.
Coûts variables Les coûts variables représentent environ 2 % des RU utilisées pour écrire des documents, bien que cela puisse varier en fonction de votre scénario. Par exemple, si vous écrivez 100 documents dans un conteneur Cosmos DB, le coût d’écriture de ces documents est de 1 000 RU. Le coût correspondant pour l’utilisation du connecteur pour lire ces documents est d’environ 2 % du coût de leur écriture, soit environ 20 RU.