Ingestování ukázkových dat ve formátu JSON do Azure Data Explorer
Článek
V tomto článku se dozvíte, jak ingestovat data ve formátu JSON do databáze Azure Data Explorer. Začnete jednoduchými příklady nezpracovaného a mapovaného kódu JSON, budete pokračovat víceřádkovým kódem JSON a pak se seznámíte se složitějšími schématy JSON obsahujícími pole a slovníky. Příklady podrobně uvádějí proces ingestování dat ve formátu JSON pomocí dotazovací jazyk Kusto (KQL), C# nebo Pythonu.
Poznámka
V produkčních scénářích nedoporučujeme používat .ingest příkazy pro správu. Místo toho použijte datový konektor nebo programově ingestujte data pomocí některé z klientských knihoven Kusto.
Požadavky
Účet Microsoft nebo Microsoft Entra identitu uživatele. Předplatné Azure není povinné.
Azure Data Explorer podporuje dva formáty souborů JSON:
json: Kód JSON oddělený řádky. Každý řádek ve vstupních datech obsahuje přesně jeden záznam JSON. Tento formát podporuje parsování komentářů a vlastností v jednoduchých uvozových uvozech. Další informace najdete v tématu Řádky JSON.
multijson: Json s více řádky. Analyzátor ignoruje oddělovače řádků a načte záznam z předchozí pozice na konec platného formátu JSON.
Poznámka
Při příjmu dat pomocí průvodce příjmem dat je multijsonvýchozí formát . Formát může zpracovávat víceřádkové záznamy JSON a pole záznamů JSON. Když dojde k chybě analýzy, celý soubor se zahodí. Pokud chcete ignorovat neplatné záznamy JSON, vyberte možnost Ignorovat chyby formátu dat. Tím se formát přepne na json (Řádky JSON).
Pokud používáte formát čáry JSON (json), čáry, které nepředstavují platné záznamy JSON, se během analýzy přeskočí.
Ingestování a mapování dat ve formátu JSON
Příjem dat ve formátu JSON vyžaduje, abyste zadali formát pomocí vlastnosti příjmu dat. Příjem dat JSON vyžaduje mapování, které mapuje zdrojovou položku JSON na cílový sloupec. Při ingestování dat použijte vlastnost s vlastností ingestionMappingReference příjmu IngestionMapping dat (pro předdefinované mapování) nebo její IngestionMappings vlastností. V tomto článku se použije vlastnost příjmu ingestionMappingReference dat, která je předem definovaná v tabulce používané k příjmu dat. V následujících příkladech začneme příjmem záznamů JSON jako nezpracovaných dat do tabulky s jedním sloupcem. Pak pomocí mapování ingestujeme každou vlastnost do namapovaného sloupce.
Příklad jednoduchého formátu JSON
Následující příklad je jednoduchý json s plochou strukturou. Data mají informace o teplotě a vlhkosti, které shromažďuje několik zařízení. Každý záznam je označený ID a časovým razítkem.
V tomto příkladu ingestujete záznamy JSON jako nezpracovaná data do tabulky s jedním sloupcem. Manipulace s daty, používání dotazů a aktualizace zásad se provádí po ingestování dat.
Tento příkaz vytvoří mapování a namapuje kořenovou cestu $ JSON ke sloupci Event .
Ingestujte data do RawEvents tabulky.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
K ingestování dat v nezpracované podobě JSON použijte jazyk C#.
RawEvents Vytvořte tabulku.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
V tomto mapování definovaném schématem timestamp tabulky se položky ingestují do sloupce Time jako datetime datové typy.
Ingestujte data do Events tabulky.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
Soubor simple.json obsahuje několik záznamů JSON oddělených řádky. Formát je jsona mapování použité v příkazu ingest je to, co FlatEventMapping jste vytvořili.
Vytvořte novou tabulku s podobným schématem jako vstupní data JSON. Tuto tabulku použijeme pro všechny následující příklady a příkazy ingestování.
V tomto mapování definovaném schématem timestamp tabulky se položky ingestují do sloupce Time jako datetime datové typy.
Ingestujte data do Events tabulky.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Soubor simple.json obsahuje několik záznamů JSON oddělených řádky. Formát je jsona mapování použité v příkazu ingest je to, co FlatEventMapping jste vytvořili.
Vytvořte novou tabulku s podobným schématem jako vstupní data JSON. Tuto tabulku použijeme pro všechny následující příklady a příkazy ingestování.
Soubor simple.json obsahuje několik záznamů JSON oddělených řádky. Formát je jsona mapování použité v příkazu ingest je to, co FlatEventMapping jste vytvořili.
Ingestování víceřádkových záznamů JSON
V tomto příkladu ingestujete víceřádkové záznamy JSON. Každá vlastnost JSON se mapuje na jeden sloupec v tabulce. Soubor multilined.json obsahuje několik odsazených záznamů JSON. Formát multijson indikuje, že se mají záznamy číst podle struktury JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Příjem dat do Events tabulky
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Datové typy pole jsou uspořádanou kolekcí hodnot. Příjem dat pole JSON se provádí pomocí zásad aktualizace. Json se ingestuje tak, jak je, do zprostředkující tabulky. Zásady aktualizace spustí v tabulce předdefinované funkce RawEvents a výsledky znovu ingestují do cílové tabulky. Ingestujeme data s následující strukturou:
Vytvořte update policy funkci, která rozbalí kolekci records tak, aby každá hodnota v kolekci obdržela samostatný řádek pomocí operátoru mv-expand . Tabulku RawEvents použijeme jako zdrojovou a Events jako cílovou tabulku.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Schéma přijaté funkcí musí odpovídat schématu cílové tabulky. Pomocí getschema operátoru zkontrolujte schéma.
EventRecordsExpand() | getschema
Přidejte zásadu aktualizace do cílové tabulky. Tato zásada automaticky spustí dotaz na všechna nově přijatá data v RawEvents zprostředkující tabulce a ingestuje výsledky do Events tabulky. Definujte zásady nulového uchovávání informací, abyste zabránili zachování zprostředkující tabulky.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Zkontrolujte data v tabulce Events .
Events
Vytvořte aktualizační funkci, která rozbalí kolekci records tak, aby každá hodnota v kolekci obdržela samostatný řádek pomocí operátoru mv-expand . Tabulku RawEvents použijeme jako zdrojovou a Events jako cílovou tabulku.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Poznámka
Schéma přijaté funkcí musí odpovídat schématu cílové tabulky.
Přidejte zásadu aktualizace do cílové tabulky. Tato zásada automaticky spustí dotaz na všechna nově přijatá data v RawEvents zprostředkující tabulce a ingestuje jeho výsledky do Events tabulky. Definujte zásady nulového uchovávání informací, abyste zabránili zachování zprostředkující tabulky.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Zkontrolujte data v tabulce Events .
Vytvořte aktualizační funkci, která rozbalí kolekci records tak, aby každá hodnota v kolekci obdržela samostatný řádek pomocí operátoru mv-expand . Tabulku RawEvents použijeme jako zdrojovou a Events jako cílovou tabulku.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Poznámka
Schéma přijaté funkcí musí odpovídat schématu cílové tabulky.
Přidejte zásadu aktualizace do cílové tabulky. Tato zásada automaticky spustí dotaz na všechna nově přijatá data v RawEvents zprostředkující tabulce a ingestuje jeho výsledky do Events tabulky. Definujte zásady nulového uchovávání informací, abyste zabránili zachování zprostředkující tabulky.