Ingest JSON formatted sample data into Azure Data Explorer
Artikkeli
This article shows you how to ingest JSON formatted data into an Azure Data Explorer database. You'll start with simple examples of raw and mapped JSON, continue to multi-lined JSON, and then tackle more complex JSON schemas containing arrays and dictionaries. The examples detail the process of ingesting JSON formatted data using Kusto Query Language (KQL), C#, or Python.
Note
We don't recommend using .ingest management commands in production scenarios. Instead, use a data connector or programmatically ingest data using one of the Kusto client libraries.
Prerequisites
A Microsoft account or a Microsoft Entra user identity. An Azure subscription isn't required.
Azure Data Explorer supports two JSON file formats:
json: Line separated JSON. Each line in the input data has exactly one JSON record. This format supports parsing of comments and single-quoted properties. For more information, see JSON Lines.
multijson: Multi-lined JSON. The parser ignores the line separators and reads a record from the previous position to the end of a valid JSON.
Note
When ingesting using the get data experience, the default format is multijson. The format can handle multiline JSON records and arrays of JSON records. When a parsing error is encountered, the entire file is discarded. To ignore invalid JSON records, select the option to "Ignore data format errors.", which will switch the format to json (JSON Lines).
If you're using the JSON Line format (json), lines that don't represent a valid JSON records are skipped during parsing.
Ingest and map JSON formatted data
Ingestion of JSON formatted data requires you to specify the format using ingestion property. Ingestion of JSON data requires mapping, which maps a JSON source entry to its target column. When ingesting data, use the IngestionMapping property with its ingestionMappingReference (for a pre-defined mapping) ingestion property or its IngestionMappings property. This article will use the ingestionMappingReference ingestion property, which is pre-defined on the table used for ingestion. In the examples below, we'll start by ingesting JSON records as raw data to a single column table. Then we'll use the mapping to ingest each property to its mapped column.
Simple JSON example
The following example is a simple JSON, with a flat structure. The data has temperature and humidity information, collected by several devices. Each record is marked with an ID and timestamp.
In this example, you ingest JSON records as raw data to a single column table. The data manipulation, using queries, and update policy is done after the data is ingested.
This command creates a mapping, and maps the JSON root path $ to the Event column.
Ingest data into the RawEvents table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.
Ingest data into the Events table.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
The file 'simple.json' has a few line-separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.
Ingest data into the Events table.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
The file 'simple.json' has a few line-separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
The file 'simple.json' has a few line separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
Ingest multi-lined JSON records
In this example, you ingest multi-lined JSON records. Each JSON property is mapped to a single column in the table. The file 'multilined.json' has a few indented JSON records. The format multijson indicates to read records by the JSON structure.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Ingest data into the Events table.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Array data types are an ordered collection of values. Ingestion of a JSON array is done by an update policy. The JSON is ingested as-is to an intermediate table. An update policy runs a pre-defined function on the RawEvents table, reingesting the results to the target table. We'll ingest data with the following structure:
Create an update policy function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
The schema received by the function must match the schema of the target table. Use getschema operator to review the schema.
EventRecordsExpand() | getschema
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest the results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Review data in the Events table.
Events
Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Note
The schema received by the function must match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Review data in the Events table.
Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Note
The schema received by the function has to match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.