Ingerir dados de exemplo formatados JSON no Azure Data Explorer
Artigo
Este artigo mostra-lhe como ingerir dados formatados JSON numa base de dados do Azure Data Explorer. Irá começar com exemplos simples de JSON não processado e mapeado, continuar para JSON multi-forrado e, em seguida, abordar esquemas JSON mais complexos que contenham matrizes e dicionários. Os exemplos detalham o processo de ingestão de dados formatados JSON com Linguagem de Pesquisa Kusto (KQL), C#ou Python.
Nota
Não recomendamos a utilização de .ingest comandos de gestão em cenários de produção. Em vez disso, utilize um conector de dados ou ingera dados através de programação através de uma das bibliotecas de cliente kusto.
Pré-requisitos
Uma conta Microsoft ou uma identidade de utilizador Microsoft Entra. Não é necessária uma subscrição do Azure.
O Azure Data Explorer suporta dois formatos de ficheiro JSON:
json: JSON separado por linhas. Cada linha nos dados de entrada tem exatamente um registo JSON. Este formato suporta a análise de comentários e propriedades com uma única citação. Para obter mais informações, veja Linhas JSON.
multijson: JSON multi-forrado. O analisador ignora os separadores de linha e lê um registo da posição anterior até ao fim de um JSON válido.
Nota
Ao ingerir com o assistente de ingestão, o formato predefinido é multijson. O formato pode processar registos JSON multiline e matrizes de registos JSON. Quando é encontrado um erro de análise, todo o ficheiro é eliminado. Para ignorar registos JSON inválidos, selecione a opção "Ignorar erros de formato de dados", que mudará o formato para json (Linhas JSON).
Se estiver a utilizar o formato de Linha JSON (json), as linhas que não representam registos JSON válidos são ignoradas durante a análise.
Ingerir e mapear dados formatados JSON
A ingestão de dados formatados JSON requer que especifique o formato com a propriedade ingestão. A ingestão de dados JSON requer mapeamento, que mapeia uma entrada de origem JSON para a coluna de destino. Ao ingerir dados, utilize a propriedade com a IngestionMapping respetiva ingestionMappingReference propriedade de ingestão (para um mapeamento predefinido) ou a respetiva IngestionMappings propriedade. Este artigo utilizará a ingestionMappingReference propriedade ingestão, que é predefinida na tabela utilizada para ingestão. Nos exemplos abaixo, vamos começar por ingerir registos JSON como dados não processados numa única tabela de colunas. Em seguida, vamos utilizar o mapeamento para ingerir cada propriedade na coluna mapeada.
Exemplo JSON simples
O exemplo seguinte é um JSON simples, com uma estrutura plana. Os dados têm informações de temperatura e humidade, recolhidas por vários dispositivos. Cada registo é marcado com um ID e carimbo de data/hora.
Neste exemplo, irá ingerir registos JSON como dados não processados numa única tabela de colunas. A manipulação de dados, a utilização de consultas e a política de atualização são efetuadas após a ingestão dos dados.
Na caixa de diálogo Adicionar cluster , introduza o URL do cluster no formulário https://<ClusterName>.<Region>.kusto.windows.net/e, em seguida, selecione Adicionar.
Cole o seguinte comando e selecione Executar para criar a tabela.
.create table RawEvents (Event: dynamic)
Esta consulta cria uma tabela com uma única Event coluna de um tipo de dados dinâmico .
Este comando cria um mapeamento e mapeia o caminho $ de raiz JSON para a Event coluna.
Ingerir dados na RawEvents tabela.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Utilize C# para ingerir dados em formato JSON não processado.
Crie a RawEvents tabela.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos utilizar esta tabela para todos os seguintes exemplos e comandos de ingestão.
Neste mapeamento, conforme definido pelo esquema da tabela, as timestamp entradas serão ingeridas na coluna Time como datetime tipos de dados.
Ingerir dados na Events tabela.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
O ficheiro "simple.json" tem alguns registos JSON separados por linhas. O formato é json, e o mapeamento utilizado no comando ingestão é o FlatEventMapping que criou.
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos utilizar esta tabela para todos os seguintes exemplos e comandos de ingestão.
Neste mapeamento, conforme definido pelo esquema da tabela, as timestamp entradas serão ingeridas na coluna Time como datetime tipos de dados.
Ingerir dados na Events tabela.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
O ficheiro "simple.json" tem alguns registos JSON separados por linhas. O formato é json, e o mapeamento utilizado no comando ingestão é o FlatEventMapping que criou.
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos utilizar esta tabela para todos os seguintes exemplos e comandos de ingestão.
O ficheiro "simple.json" tem alguns registos JSON separados por linhas. O formato é json, e o mapeamento utilizado no comando ingestão é o FlatEventMapping que criou.
Ingerir registos JSON multilindrados
Neste exemplo, irá ingerir registos JSON multilindrados. Cada propriedade JSON é mapeada para uma única coluna na tabela. O ficheiro "multilined.json" tem alguns registos JSON com avanço. O formato multijson indica para ler registos pela estrutura JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Ingerir dados na Events tabela.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Os tipos de dados de matriz são uma coleção ordenada de valores. A ingestão de uma matriz JSON é feita por uma política de atualização. O JSON é ingerido tal como está numa tabela intermédia. Uma política de atualização executa uma função predefinida na RawEvents tabela, reestando os resultados para a tabela de destino. Iremos ingerir dados com a seguinte estrutura:
Crie uma função update policy que expanda a coleção de records para que cada valor na coleção receba uma linha separada, utilizando o mv-expand operador. Vamos utilizar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
O esquema recebido pela função tem de corresponder ao esquema da tabela de destino. Utilize getschema o operador para rever o esquema.
EventRecordsExpand() | getschema
Adicione a política de atualização à tabela de destino. Esta política irá executar automaticamente a consulta em quaisquer dados recentemente ingeridos na RawEvents tabela intermédia e ingerir os resultados na Events tabela. Defina uma política de retenção zero para evitar manter a tabela intermédia.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Reveja os dados na Events tabela.
Events
Crie uma função de atualização que expanda a coleção de records para que cada valor na coleção receba uma linha separada, utilizando o mv-expand operador. Vamos utilizar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Nota
O esquema recebido pela função tem de corresponder ao esquema da tabela de destino.
Adicione a política de atualização à tabela de destino. Esta política irá executar automaticamente a consulta em quaisquer dados recentemente ingeridos na RawEvents tabela intermédia e ingerir os respetivos resultados na Events tabela. Defina uma política de retenção zero para evitar manter a tabela intermédia.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Reveja os dados na Events tabela.
Crie uma função de atualização que expanda a coleção de records para que cada valor na coleção receba uma linha separada, utilizando o mv-expand operador. Vamos utilizar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Nota
O esquema recebido pela função tem de corresponder ao esquema da tabela de destino.
Adicione a política de atualização à tabela de destino. Esta política irá executar automaticamente a consulta em quaisquer dados recentemente ingeridos na RawEvents tabela intermédia e ingerir os respetivos resultados na Events tabela. Defina uma política de retenção zero para evitar manter a tabela intermédia.