Inserire dati usando la libreria Node di Esplora dati di Azure

Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure offre due librerie client per Node: una libreria di inserimento e una libreria di dati. Queste librerie consentono di inserire (caricare) i dati in un cluster ed eseguire una query di dati dal codice. In questo articolo viene prima creato un mapping di tabella e dati in un cluster di test. Quindi viene accodato l'inserimento nel cluster e vengono convalidati i risultati.

Se non si ha una sottoscrizione di Azure, creare un account Azure gratuito prima di iniziare.

Prerequisiti

  • Un account Microsoft o un'identità utente di Microsoft Entra. Non è necessaria una sottoscrizione di Azure.
  • Un cluster e un database di Esplora dati di Azure. Creare un cluster e un database.
  • Node.js installato nel computer di sviluppo

Installare le librerie di dati e di inserimento

Installare azure-kusto-ingest e azure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Aggiungere le costanti e le istruzioni import

Importare le classi dalle librerie


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Per autenticare un'applicazione, Azure Esplora dati usa l'ID tenant Microsoft Entra. Per trovare l'ID tenant, seguire Trovare l'ID tenant di Microsoft 365.

Impostare i valori per authorityId, kustoUri, kustoIngestUri e kustoDatabase prima di eseguire questo codice.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Costruire ora la stringa di connessione. Questo esempio usa l'autenticazione del dispositivo per accedere al cluster. Controllare l'output della console per completare l'autenticazione. È anche possibile usare un certificato dell'applicazione Microsoft Entra, la chiave dell'applicazione e l'utente e la password.

Creare la tabella di destinazione e il mapping in un passaggio successivo.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Impostare le informazioni sul file di origine

Importare altre classi e impostare costanti per il file di origine dati. Questo esempio usa un file di esempio ospitato nell'archiviazione BLOB di Azure. Il set di dati di esempio StormEvents contiene dati relativi al meteo provenienti dai Centri nazionali per le informazioni ambientali.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Creare una tabella nel cluster di prova

Creare una tabella che corrisponda allo schema dei dati nel file StormEvents.csv. Quando viene eseguito questo codice, restituisce un messaggio simile al seguente: Per accedere, usare un Web browser per aprire la pagina https://microsoft.com/devicelogin e immettere il codice XXXXXXXXX per l'autenticazione. Seguire i passaggi per l'accesso, quindi tornare a eseguire il blocco di codice successivo. I blocchi di codice successivi che stabiliscono una connessione richiederanno di eseguire nuovamente l'accesso.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Definire il mapping di inserimento

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna e tipi di dati usati durante la creazione della tabella.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Accodare un messaggio per l'inserimento

Accodare un messaggio per eseguire il pull dei dati dall'archiviazione BLOB e inserire i dati in Esplora dati di Azure.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Verificare che la tabella contenga dati

Verificare che i dati siano stati inseriti nella tabella. Attendere da cinque a dieci minuti per l'operazione di inserimento in coda per pianificare l'inserimento e caricare i dati in Esplora dati di Azure. Eseguire quindi il codice seguente per ottenere il numero di record nella tabella StormEvents.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Eseguire query sulla risoluzione dei problemi

Accedere al https://dataexplorer.azure.com e connettersi al cluster. Eseguire il comando seguente nel database per verificare la presenza di eventuali errori di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Eseguire il comando seguente per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Pulire le risorse

Se si prevede di seguire gli altri articoli, mantenere le risorse create. In caso contrario, eseguire il comando seguente nel database per pulire la tabella StormEvents.

.drop table StormEvents