Menyerap data menggunakan pustaka Azure Data Explorer Node

Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Azure Data Explorer menyediakan dua pustaka klien untuk Simpul: pustaka penyerapan dan pustaka data. Pustaka ini memungkinkan Anda untuk menyerap (memuat) data ke dalam kluster dan mengkueri data dari kode Anda. Dalam artikel ini, Anda terlebih dahulu membuat tabel dan pemetaan data dalam kluster pengujian. Anda kemudian mengantrekan penyerapan ke kluster dan memvalidasi hasilnya.

Jika Anda tidak memiliki langganan Azure, buat akun Azure gratis sebelum Anda memulai.

Prasyarat

  • Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
  • Kluster dan database Azure Data Explorer. Membuat kluster dan database.
  • Node.js terinstal pada komputer pengembangan Anda

Menginstal pustaka data dan menyerap

Menginstal azure-kusto-ingest dan azure-kusto-data

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Menambahkan pernyataan dan konstanta impor

Mengimpor kelas dari pustaka


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Untuk mengautentikasi aplikasi, Azure Data Explorer menggunakan ID penyewa Microsoft Entra Anda. Untuk menemukan ID penyewa Anda, ikuti Temukan ID penyewa Microsoft 365 Anda.

Atur nilai untuk authorityId, kustoUri, kustoIngestUri dan kustoDatabase sebelum menjalankan kode ini.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Sekarang buat string koneksi. Contoh ini menggunakan autentikasi perangkat untuk mengakses kluster. Periksa output konsol untuk menyelesaikan autentikasi. Anda juga dapat menggunakan sertifikat aplikasi Microsoft Entra, kunci aplikasi, serta pengguna dan kata sandi.

Anda membuat tabel tujuan dan pemetaan di langkah selanjutnya.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Mengatur informasi file sumber

Impor lebih banyak kelas dan atur konstanta untuk file sumber data. Contoh ini menggunakan file sampel yang dihosting di Azure Blob Storage. Himpunan data sampel StormEvents berisi data terkait cuaca dari Pusat Nasional untuk Informasi Lingkungan.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Membuat tabel di kluster pengujian Anda

Buat tabel yang cocok dengan skema data dalam StormEvents.csv file. Ketika kode ini berjalan, kode ini mengembalikan pesan seperti berikut: Untuk masuk, gunakan browser web untuk membuka halaman https://microsoft.com/devicelogin dan memasukkan kode XXXXXXXXX untuk mengautentikasi. Ikuti langkah-langkah untuk masuk, lalu kembali untuk menjalankan blok kode berikutnya. Blok kode berikutnya yang membuat koneksi akan mengharuskan Anda untuk masuk lagi.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Menentukan pemetaan penyerapan

Petakan data CSV masuk ke nama kolom dan jenis data yang digunakan saat membuat tabel.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Mengantrekan pesan untuk penyerapan

Antrekan pesan untuk menarik data dari penyimpanan blob dan menyerap data tersebut ke Azure Data Explorer.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Memvalidasi bahwa tabel berisi data

Validasi bahwa data diserap ke dalam tabel. Tunggu selama lima hingga sepuluh menit hingga penyerapan antrean menjadwalkan penyerapan dan memuat data ke Azure Data Explorer. Lalu jalankan kode berikut untuk mendapatkan hitungan rekaman dalam StormEvents tabel.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Menjalankan kueri pemecahan masalah

Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Membersihkan sumber daya

Jika Anda berencana untuk mengikuti artikel kami yang lain, simpan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut ini di database Anda untuk membersihkan StormEvents tabel.

.drop table StormEvents