Ingérer des exemples de données au format JSON dans Azure Data Explorer
Article
Cet article vous montre comment ingérer des données au format JSON dans une base de données Azure Data Explorer. Vous commencerez par des exemples simples de données JSON brutes et mappées, puis vous passerez à des données JSON multilignes, et enfin à des schémas JSON plus complexes contenant des tableaux et des dictionnaires. Les exemples détaillent le processus d’ingestion de données au format JSON à l’aide de Langage de requête Kusto (KQL), C# ou Python.
Notes
Nous vous déconseillons d’utiliser .ingest des commandes de gestion dans les scénarios de production. Utilisez plutôt un connecteur de données ou ingérer des données par programmation à l’aide de l’une des bibliothèques clientes Kusto.
Prérequis
Un compte Microsoft ou une identité d’utilisateur Microsoft Entra. Un abonnement Azure n’est pas requis.
Azure Data Explorer prend en charge deux formats de fichier JSON :
json: JSON séparé par une ligne. Chaque ligne des données d’entrée contient exactement un enregistrement JSON. Ce format prend en charge l’analyse des commentaires et des propriétés entre guillemets uniques. Pour plus d’informations, consultez JSON Lines.
multijson: JSON multiligne. L’analyseur ignore les séparateurs de ligne et lit un enregistrement de la position précédente jusqu’à la fin d’un JSON valide.
Notes
Lors de l’ingestion à l’aide de l’Assistant Ingestion, le format par défaut est multijson. Le format peut gérer les enregistrements JSON multilignes et les tableaux d’enregistrements JSON. Lorsqu’une erreur d’analyse est rencontrée, le fichier entier est ignoré. Pour ignorer les enregistrements JSON non valides, sélectionnez l’option « Ignorer les erreurs de format de données », ce qui basculera le format vers json (lignes JSON).
Si vous utilisez le format de ligne JSON (json), les lignes qui ne représentent pas d’enregistrements JSON valides sont ignorées pendant l’analyse.
Ingérer et mapper des données au format JSON
L’ingestion de données au format JSON vous oblige à spécifier le format à l’aide de la propriété d’ingestion. L’ingestion de données JSON requiert un mappage, lequel mappe une entrée de source JSON à sa colonne cible. Lors d’une ingestion de données, utilisez la propriété IngestionMapping avec sa propriété d’ingestion ingestionMappingReference (pour un mappage prédéfini) ou sa propriété IngestionMappings. Cet article utilise la propriété d’ingestion ingestionMappingReference, qui est prédéfinie sur la table utilisée pour l’ingestion. Dans les exemples ci-dessous, nous allons commencer par ingérer des enregistrements JSON en tant que données brutes dans une table à une seule colonne. Nous utiliserons ensuite le mappage pour ingérer chaque propriété dans sa colonne mappée.
Exemple JSON simple
L’exemple suivant est un JSON simple, avec une structure plate. Les données comportent des informations sur la température et l’humidité, collectées par plusieurs appareils. Chaque enregistrement est marqué d’un ID et d’un timestamp.
Dans cet exemple, vous ingérez des enregistrements JSON en tant que données brutes dans une table à une seule colonne. La manipulation des données, l’utilisation de requêtes et la stratégie de mise à jour sont effectuées une fois que les données sont ingérées.
Dans la boîte de dialogue Ajouter un cluster, entrez l’URL de votre cluster sous la forme https://<ClusterName>.<Region>.kusto.windows.net/, puis sélectionnez Ajouter.
Collez la commande suivante, puis sélectionnez Exécuter pour créer la table.
.create table RawEvents (Event: dynamic)
Cette requête crée la table munie d’une seule colonne Event d’un type de donnéesdynamique.
Cette commande crée un mappage et mappe le chemin d’accès racine JSON $ à la colonne Event.
Ingérez des données dans la table RawEvents.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Utilisez C# pour ingérer des données dans un format JSON brut.
Créez la table RawEvents.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Créez une nouvelle table, avec un schéma similaire aux données d’entrée JSON. Nous utiliserons cette table pour tous les exemples et commandes d’ingestion suivants.
Dans ce mappage, comme défini par le schéma de la table, les entrées timestamp sont ingérées dans la colonne Time en tant que types de données datetime.
Ingérez des données dans la table Events.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
Le fichier « simple.json » comporte quelques enregistrements JSON séparés par des lignes. Le format est json, et le mappage utilisé dans la commande d’ingestion est le FlatEventMapping que vous avez créé.
Créez une nouvelle table, avec un schéma similaire aux données d’entrée JSON. Nous utiliserons cette table pour tous les exemples et commandes d’ingestion suivants.
Dans ce mappage, comme défini par le schéma de la table, les entrées timestamp sont ingérées dans la colonne Time en tant que types de données datetime.
Ingérez des données dans la table Events.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Le fichier « simple.json » comporte quelques enregistrements JSON séparés par des lignes. Le format est json, et le mappage utilisé dans la commande d’ingestion est le FlatEventMapping que vous avez créé.
Créez une nouvelle table, avec un schéma similaire aux données d’entrée JSON. Nous utiliserons cette table pour tous les exemples et commandes d’ingestion suivants.
Le fichier « simple.json » comporte quelques enregistrements JSON séparés par des lignes. Le format est json, et le mappage utilisé dans la commande d’ingestion est le FlatEventMapping que vous avez créé.
Ingérer des enregistrements JSON multilignes
Dans cet exemple, vous ingérez des enregistrements JSON multilignes. Chaque propriété JSON est mappée à une colonne unique de la table. Le fichier « multilined.json » comporte quelques enregistrements JSON mis en retrait. Le format multijson indique la lecture des enregistrements par la structure JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Ingérez des données dans la table Events.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Ingérer des enregistrements JSON contenant des tableaux
Les données de type tableau sont des collections ordonnées de valeurs. L’ingestion d’un tableau JSON est effectuée par une stratégie de mise à jour. Le JSON est ingéré tel quel dans une table intermédiaire. Une stratégie de mise à jour exécute une fonction prédéfinie sur la table RawEvents, en ingérant de nouveau les résultats dans la table cible. Nous allons ingérer les données avec la structure suivante :
Créez une fonction update policy qui développe la collection de records pour que chaque valeur de la collection reçoive une ligne distincte, à l’aide de l’opérateur mv-expand. Nous utiliserons la table RawEvents en tant que table source et Events comme table cible.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Le schéma reçu par la fonction doit correspondre au schéma de la table cible. Utilisez l’opérateur getschema pour examiner le schéma.
EventRecordsExpand() | getschema
Ajoutez la stratégie de mise à jour à la table cible. Cette stratégie exécute automatiquement la requête sur toutes les nouvelles données ingérées dans la table intermédiaire RawEvents et ingère ses résultats dans la table Events. Définissez une stratégie de rétention zéro pour éviter la persistance de la table intermédiaire.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Examinez les données dans la table Events.
Events
Créez une fonction de mise à jour qui développe la collection de records pour que chaque valeur de la collection reçoive une ligne distincte, à l’aide de l’opérateur mv-expand. Nous utiliserons la table RawEvents en tant que table source et Events comme table cible.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Notes
Le schéma reçu par la fonction doit correspondre au schéma de la table cible.
Ajoutez la stratégie de mise à jour à la table cible. Cette stratégie exécute automatiquement la requête sur toutes les nouvelles données ingérées dans la table intermédiaire RawEvents et ingère ses résultats dans la table Events. Définissez une stratégie de rétention zéro pour éviter la persistance de la table intermédiaire.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Examinez les données dans la table Events.
Créez une fonction de mise à jour qui développe la collection de records pour que chaque valeur de la collection reçoive une ligne distincte, à l’aide de l’opérateur mv-expand. Nous utiliserons la table RawEvents en tant que table source et Events comme table cible.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Notes
Le schéma reçu par la fonction doit correspondre au schéma de la table cible.
Ajoutez la stratégie de mise à jour à la table cible. Cette stratégie exécute automatiquement la requête sur toutes les nouvelles données ingérées dans la table intermédiaire RawEvents et ingère ses résultats dans la table Events. Définissez une stratégie de rétention zéro pour éviter la persistance de la table intermédiaire.