Questo articolo illustra come inserire dati in formato JSON in un database Esplora dati di Azure. Si inizierà con semplici esempi di JSON non elaborati e mappati, si continuerà a json multipiaggiata e quindi si affronteranno schemi JSON più complessi contenenti matrici e dizionari. Gli esempi illustrano in dettaglio il processo di inserimento di dati in formato JSON usando Linguaggio di query Kusto (KQL), C#o Python.
Nota
Non è consigliabile usare .ingest
i comandi di gestione negli scenari di produzione. Usare invece un connettore dati o inserire dati a livello di codice usando una delle librerie client Kusto.
Prerequisiti
- Un account Microsoft o un'identità utente di Microsoft Entra. Non è necessaria una sottoscrizione di Azure.
- Un cluster e un database di Esplora dati di Azure. Creare un cluster e un database.
Azure Esplora dati supporta due formati di file JSON:
json
: JSON separato da righe. Ogni riga nei dati di input ha esattamente un record JSON. Questo formato supporta l'analisi dei commenti e delle proprietà tra virgolette singole. Per altre informazioni, vedere Righe JSON.
multijson
: JSON con più righe. Il parser ignora i separatori di riga e legge un record dalla posizione precedente alla fine di un json valido.
Nota
Quando si inserisce usando l'esperienza di recupero dei dati, il formato predefinito è multijson
. Il formato può gestire record JSON su più righe e matrici di record JSON. Quando viene rilevato un errore di analisi, l'intero file viene rimosso. Per ignorare i record JSON non validi, selezionare l'opzione "Ignora errori di formato dati". In questo modo il formato verrà commutato su json
(righe JSON).
Se si usa il formato di riga JSON (json
), le righe che non rappresentano un record JSON valido vengono ignorate durante l'analisi.
L'inserimento di dati in formato JSON richiede di specificare il formato usando la proprietà di inserimento. L'inserimento di dati JSON richiede il mapping, che esegue il mapping di una voce di origine JSON alla colonna di destinazione. Quando si inseriscono dati, usare la proprietà con la IngestionMapping
relativa ingestionMappingReference
proprietà di inserimento (per un mapping predefinito) o la relativa IngestionMappings
proprietà. Questo articolo userà la ingestionMappingReference
proprietà di inserimento, definita in modo predefinito nella tabella usata per l'inserimento. Negli esempi seguenti si inizierà inserendo record JSON come dati non elaborati in una singola tabella di colonne. Si userà quindi il mapping per inserire ogni proprietà nella colonna mappata.
Esempio JSON semplice
L'esempio seguente è un codice JSON semplice, con una struttura piatta. I dati contengono informazioni sulla temperatura e sull'umidità, raccolte da diversi dispositivi. Ogni record è contrassegnato con un ID e un timestamp.
{
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
}
Inserire record JSON non elaborati
In questo esempio i record JSON vengono inseriti come dati non elaborati in una singola tabella di colonne. La manipolazione dei dati, l'uso di query e i criteri di aggiornamento vengono eseguiti dopo l'inserimento dei dati.
Usare Linguaggio di query Kusto per inserire dati in un formato JSON non elaborato.
Accedere a https://dataexplorer.azure.com.
Selezionare Add cluster (Aggiungi cluster).
Nella finestra di dialogo Add cluster (Aggiungi cluster) immettere l'URL del cluster nel modulohttps://<ClusterName>.<Region>.kusto.windows.net/
, quindi selezionare Aggiungi.
Incollare il comando seguente e selezionare Esegui per creare una tabella.
.create table RawEvents (Event: dynamic)
Questa query crea una tabella con una singola Event
colonna di un tipo di dati dinamico .
Creare il mapping JSON.
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
Questo comando crea un mapping ed esegue il mapping del percorso $
radice JSON alla Event
colonna.
Inserire i dati nella RawEvents
tabella.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Usare C# per inserire dati in formato JSON non elaborato.
Creare la RawEvents
tabella.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Creare il mapping JSON.
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Questo comando crea un mapping ed esegue il mapping del percorso $
radice JSON alla Event
colonna.
Inserire i dati nella RawEvents
tabella.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Nota
I dati vengono aggregati in base ai criteri di invio in batch, con conseguente latenza di alcuni minuti.
Usare Python per inserire dati in formato JSON non elaborato.
Creare la RawEvents
tabella.
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Creare il mapping JSON.
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Inserire i dati nella RawEvents
tabella.
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Nota
I dati vengono aggregati in base ai criteri di invio in batch, con conseguente latenza di alcuni minuti.
Inserire record JSON mappati
In questo esempio si inseriscono i dati dei record JSON. Ogni proprietà JSON viene mappata a una singola colonna della tabella.
Creare una nuova tabella con uno schema simile ai dati di input JSON. Questa tabella verrà usata per tutti gli esempi e i comandi di inserimento seguenti.
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
Creare il mapping JSON.
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
In questo mapping, come definito dallo schema della tabella, le timestamp
voci verranno inserite nella colonna Time
come datetime
tipi di dati.
Inserire i dati nella Events
tabella.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
Il file 'simple.json' include alcuni record JSON separati da righe. Il formato è json
e il mapping usato nel comando di inserimento è quello FlatEventMapping
creato.
Creare una nuova tabella con uno schema simile ai dati di input JSON. Questa tabella verrà usata per tutti gli esempi e i comandi di inserimento seguenti.
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Creare il mapping JSON.
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
In questo mapping, come definito dallo schema della tabella, le timestamp
voci verranno inserite nella colonna Time
come datetime
tipi di dati.
Inserire i dati nella Events
tabella.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Il file 'simple.json' include alcuni record JSON separati da righe. Il formato è json
e il mapping usato nel comando di inserimento è quello FlatEventMapping
creato.
Creare una nuova tabella con uno schema simile ai dati di input JSON. Questa tabella verrà usata per tutti gli esempi e i comandi di inserimento seguenti.
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Creare il mapping JSON.
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Inserire i dati nella Events
tabella.
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Il file 'simple.json' ha alcuni record JSON separati da righe. Il formato è json
e il mapping usato nel comando di inserimento è quello FlatEventMapping
creato.
Inserire record JSON con più righe
In questo esempio si inseriscono record JSON con più righe. Ogni proprietà JSON viene mappata a una singola colonna della tabella. Il file 'multilined.json' ha alcuni record JSON rientrati. Il formato multijson
indica di leggere i record in base alla struttura JSON.
Inserire i dati nella Events
tabella.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Inserire i dati nella Events
tabella.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Inserire i dati nella Events
tabella.
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Inserire record JSON contenenti matrici
I tipi di dati matrice sono una raccolta ordinata di valori. L'inserimento di una matrice JSON viene eseguito da un criterio di aggiornamento. Il codice JSON viene inserito così come è in una tabella intermedia. Un criterio di aggiornamento esegue una funzione predefinita nella RawEvents
tabella, rivalutando i risultati nella tabella di destinazione. I dati verranno inseriti con la struttura seguente:
{
"records":
[
{
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
},
{
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
}
]
}
Creare una update policy
funzione che espande la raccolta di records
in modo che ogni valore nella raccolta riceva una riga separata, usando l'operatore mv-expand
. La tabella RawEvents
verrà usata come tabella di origine e Events
come tabella di destinazione.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Lo schema ricevuto dalla funzione deve corrispondere allo schema della tabella di destinazione. Usare l'operatore getschema
per esaminare lo schema.
EventRecordsExpand() | getschema
Aggiungere il criterio di aggiornamento nella tabella di destinazione. Questo criterio eseguirà automaticamente la query su tutti i dati appena inseriti nella RawEvents
tabella intermedia e inserisce i risultati nella Events
tabella. Definire un criterio di conservazione zero per evitare di rendere persistente la tabella intermedia.
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
Inserire i dati nella RawEvents
tabella.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Esaminare i dati nella Events
tabella.
Events
Creare una funzione di aggiornamento che espande la raccolta di records
in modo che ogni valore nella raccolta riceva una riga separata, usando l'operatore mv-expand
. La tabella RawEvents
verrà usata come tabella di origine e Events
come tabella di destinazione.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Nota
Lo schema ricevuto dalla funzione deve corrispondere allo schema della tabella di destinazione.
Aggiungere il criterio di aggiornamento nella tabella di destinazione. Questo criterio eseguirà automaticamente la query su tutti i dati appena inseriti nella RawEvents
tabella intermedia e inserisce i risultati nella Events
tabella. Definire un criterio di conservazione zero per evitare di rendere persistente la tabella intermedia.
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
Inserire i dati nella RawEvents
tabella.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Esaminare i dati nella Events
tabella.
Creare una funzione di aggiornamento che espande la raccolta di records
in modo che ogni valore nella raccolta riceva una riga separata, usando l'operatore mv-expand
. La tabella RawEvents
verrà usata come tabella di origine e Events
come tabella di destinazione.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Nota
Lo schema ricevuto dalla funzione deve corrispondere allo schema della tabella di destinazione.
Aggiungere il criterio di aggiornamento nella tabella di destinazione. Questo criterio eseguirà automaticamente la query su tutti i dati appena inseriti nella RawEvents
tabella intermedia e inserisce i risultati nella Events
tabella. Definire un criterio di conservazione zero per evitare di rendere persistente la tabella intermedia.
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Inserire i dati nella RawEvents
tabella.
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Esaminare i dati nella Events
tabella.
Contenuto correlato