Menyerap data menggunakan Kusto .NET SDK
Ada dua pustaka klien untuk .NET: pustaka penyerapan dan pustaka data. Untuk informasi selengkapnya tentang .NET SDK, lihat tentang .NET SDK. Pustaka ini memungkinkan Anda untuk menyerap (memuat) data ke dalam kluster dan mengkueri data dari kode Anda. Dalam artikel ini, Anda terlebih dahulu membuat tabel dan pemetaan data dalam kluster pengujian. Anda kemudian mengantrekan penyerapan ke kluster dan memvalidasi hasilnya.
Prasyarat
- Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
- Kluster dan database. Membuat kluster dan database.
Menginstal pustaka penyerapan
Install-Package Microsoft.Azure.Kusto.Ingest
Menambahkan string koneksi autentikasi dan konstruksi
Autentikasi
Untuk mengautentikasi aplikasi, SDK menggunakan ID penyewa Microsoft Entra Anda. Untuk menemukan ID penyewa Anda, gunakan URL berikut, ganti domain Anda untuk YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Misalnya, jika domain Anda contoso.com, URL-nya adalah: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik URL ini untuk melihat hasilnya; baris pertama adalah sebagai berikut.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
ID penyewa dalam hal ini adalah 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
Contoh ini menggunakan autentikasi pengguna Microsoft Entra interaktif untuk mengakses kluster. Anda juga dapat menggunakan autentikasi aplikasi Microsoft Entra dengan sertifikat atau rahasia aplikasi. Pastikan untuk mengatur nilai yang benar untuk tenantId
dan clusterUri
sebelum menjalankan kode ini.
SDK menyediakan cara mudah untuk menyiapkan metode autentikasi sebagai bagian dari string koneksi. Untuk dokumentasi lengkap tentang string koneksi, lihat string koneksi.
Catatan
Versi SDK saat ini tidak mendukung autentikasi pengguna interaktif di .NET Core. Jika diperlukan, gunakan nama pengguna/kata sandi Microsoft Entra atau autentikasi aplikasi sebagai gantinya.
Membangun string koneksi
Sekarang Anda dapat membangun string koneksi. Anda akan membuat tabel tujuan dan pemetaan di langkah selanjutnya.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Mengatur informasi file sumber
Atur jalur untuk file sumber. Contoh ini menggunakan file sampel yang dihosting di Azure Blob Storage. Himpunan data sampel StormEvents berisi data terkait cuaca dari Pusat Nasional untuk Informasi Lingkungan.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Membuat tabel di kluster pengujian Anda
Buat tabel bernama StormEvents
yang cocok dengan skema data dalam file StormEvents.csv
.
Tip
Cuplikan kode berikut membuat instans klien untuk hampir setiap panggilan. Hal ini dilakukan untuk membuat setiap cuplikan secara individual dapat dijalankan. Dalam produksi, instans klien bersifat reentrant, dan harus disimpan selama yang diperlukan. Instans klien tunggal per URI sudah cukup, bahkan ketika bekerja dengan beberapa database (database dapat ditentukan pada tingkat perintah).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Menentukan pemetaan penyerapan
Petakan data CSV yang masuk ke nama kolom yang digunakan saat membuat tabel. Provisikan objek pemetaan kolom CSV pada tabel tersebut.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Menentukan kebijakan batching untuk tabel Anda
Data masuk batching mengoptimalkan ukuran shard data, yang dikontrol oleh kebijakan batching penyerapan. Ubah kebijakan dengan perintah manajemen kebijakan batching penyerapan. Gunakan kebijakan ini untuk mengurangi latensi data yang tiba secara perlahan.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Sebaiknya tetapkan nilai untuk data yang Raw Data Size
diserap dan secara bertahap mengurangi ukuran menjadi 250 MB, sambil memeriksa apakah performa meningkat.
Anda dapat menggunakan Flush Immediately
properti untuk melewati batching, meskipun ini tidak disarankan untuk penyerapan skala besar karena dapat menyebabkan performa yang buruk.
Mengantrekan pesan untuk penyerapan
Antrekan pesan untuk menarik data dari penyimpanan blob dan menyerap data. Koneksi dibuat ke kluster penyerapan, dan klien lain dibuat untuk bekerja dengan titik akhir tersebut.
Tip
Cuplikan kode berikut membuat instans klien untuk hampir setiap panggilan. Hal ini dilakukan untuk membuat setiap cuplikan secara individual dapat dijalankan. Dalam produksi, instans klien bersifat reentrant, dan harus disimpan selama yang diperlukan. Instans klien tunggal per URI sudah cukup, bahkan ketika bekerja dengan beberapa database (database dapat ditentukan pada tingkat perintah).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Memvalidasi data diserap ke dalam tabel
Tunggu lima hingga sepuluh menit hingga penyerapan antrean menjadwalkan penyerapan dan memuat data ke dalam kluster Anda. Kemudian jalankan kode berikut untuk mendapatkan jumlah rekaman dalam StormEvents
tabel.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Menjalankan kueri pemecahan masalah
Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Membersihkan sumber daya
Jika Anda berencana untuk mengikuti artikel kami yang lain, simpan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut dalam database Anda untuk membersihkan StormEvents
tabel.
.drop table StormEvents