Ingestování dat pomocí knihovny Azure Data Explorer Node
Průzkumník dat Azure je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Azure Data Explorer nabízí dvě klientské knihovny pro Node: knihovnu ingestování a knihovnu dat. Tyto knihovny umožňují snadno ingestovat (načíst) data do clusteru a dotazovat se na data z vašeho kódu. V tomto článku nejprve vytvoříte mapování tabulek a dat v testovacím clusteru. Pak vytvoříte frontu ingestace do clusteru a ověříte výsledky.
Pokud ještě nemáte předplatné Azure, vytvořte si bezplatný účet Azure před tím, než začnete.
Požadavky
- Účet Microsoft nebo Microsoft Entra identitu uživatele. Předplatné Azure není povinné.
- Cluster a databáze Azure Data Explorer. Vytvořte cluster a databázi.
- Node.js nainstalované na počítači pro vývoj
Instalace knihovny dat a knihovny ingestů
Nainstalujte azure-kusto-ingest a azure-kusto-data.
npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2
Přidání příkazů a konstant pro import
Importujte třídy z knihoven.
const { Client: KustoClient, KustoConnectionStringBuilder } = require('azure-kusto-data');
const {
IngestClient: KustoIngestClient,
IngestionProperties,
IngestionDescriptors,
DataFormat,
IngestionMappingKind,
} = require("azure-kusto-ingest");
Azure Data Explorer k ověření aplikace používá ID vašeho tenanta Microsoft Entra. Pokud chcete zjistit ID tenanta, postupujte podle pokynů v tématu Zjištění ID tenanta Microsoftu 365.
Před spuštěním tohoto kódu nastavte hodnoty authorityId
, kustoUri
, kustoIngestUri
a kustoDatabase
.
const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase = "Weather";
Teď sestavte připojovací řetězec. V tomto příkladu se pro přístup ke clusteru používá ověřování pomocí zařízení. Zkontrolujte výstup konzoly a dokončete ověřování. Můžete také použít Microsoft Entra certifikát aplikace, klíč aplikace a uživatele a heslo.
Cílovou tabulku a mapování vytvoříte v pozdějším kroku.
const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";
Nastavení informací o zdrojovém souboru
Importujte další třídy a nastavte konstanty pro soubor zdroje dat. Tento příklad používá ukázkový soubor hostovaný v Azure Blob Storage. Ukázková datová sada StormEvents obsahuje data související s počasím z Národních center pro informace o životním prostředí.
const container = "samplefiles";
const account = "kustosamples";
const sas = ""; // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;
Vytvoření tabulky v testovacím clusteru
Vytvořte tabulku, která odpovídá schématu dat v souboru StormEvents.csv
. Když se tento kód spustí, vrátí podobnou zprávu: Pokud se chcete přihlásit, otevřete ve webovém prohlížeči stránku https://microsoft.com/devicelogin a zadejte kód XXXXXXXXX k ověření. Podle pokynů se přihlaste a pak se vraťte a spusťte další blok kódu. Následující bloky kódu, které provedou připojení, budou vyžadovat, abyste se znovu přihlásili.
const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;
const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);
Definování mapování ingestace
Namapujte příchozí data CSV na názvy sloupců a datové typy použité při vytváření tabulky.
const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;
const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);
Přidání zprávy do fronty pro ingestaci
Přidejte zprávu do fronty k získání dat z úložiště objektů blob a tato data ingestujte do Průzkumníka dat Azure.
const defaultProps = new IngestionProperties({
database: kustoDatabase,
table: destTable,
format: DataFormat.CSV,
ingestionMappingReference: destTableMapping,
ingestionMappingKind: IngestionMappingKind.CSV,
additionalProperties: {ignoreFirstRecord: true},
});
const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
const blobDesc = new BlobDescriptor(blobPath, 10);
try {
const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
// Handle errors
}
Ověření, že tabulka obsahuje data
Ověřte, že se data ingestovala do tabulky. Počkejte pět až deset minut, než ingestace ve frontě naplánuje ingestování a načte data do Průzkumníka dat Azure. Pak spuštěním následujícího kódu získejte počet záznamů v tabulce StormEvents
.
const query = `${destTable} | count`;
var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);
Spuštění dotazů pro řešení potíží
Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Vyčištění prostředků
Pokud chcete postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spuštěním následujícího příkazu v databázi tabulku StormEvents
vyčistěte.
.drop table StormEvents