Bagikan melalui


Menyerap data menggunakan Kusto .NET SDK

Ada dua pustaka klien untuk .NET: pustaka penyerapan dan pustaka data. Untuk informasi selengkapnya tentang .NET SDK, lihat tentang .NET SDK. Pustaka ini memungkinkan Anda untuk menyerap (memuat) data ke dalam kluster dan mengkueri data dari kode Anda. Dalam artikel ini, Anda terlebih dahulu membuat tabel dan pemetaan data dalam kluster pengujian. Anda kemudian mengantrekan penyerapan ke kluster dan memvalidasi hasilnya.

Prasyarat

  • Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
  • Kluster dan database. Membuat kluster dan database.

Menginstal pustaka penyerapan

Install-Package Microsoft.Azure.Kusto.Ingest

Menambahkan string koneksi autentikasi dan konstruksi

Autentikasi

Untuk mengautentikasi aplikasi, SDK menggunakan ID penyewa Microsoft Entra Anda. Untuk menemukan ID penyewa Anda, gunakan URL berikut, ganti domain Anda untuk YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Misalnya, jika domain Anda contoso.com, URL-nya adalah: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik URL ini untuk melihat hasilnya; baris pertama adalah sebagai berikut.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

ID penyewa dalam hal ini adalah 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Contoh ini menggunakan autentikasi pengguna Microsoft Entra interaktif untuk mengakses kluster. Anda juga dapat menggunakan autentikasi aplikasi Microsoft Entra dengan sertifikat atau rahasia aplikasi. Pastikan untuk mengatur nilai yang benar untuk tenantId dan clusterUri sebelum menjalankan kode ini.

SDK menyediakan cara mudah untuk menyiapkan metode autentikasi sebagai bagian dari string koneksi. Untuk dokumentasi lengkap tentang string koneksi, lihat string koneksi.

Catatan

Versi SDK saat ini tidak mendukung autentikasi pengguna interaktif di .NET Core. Jika diperlukan, gunakan nama pengguna/kata sandi Microsoft Entra atau autentikasi aplikasi sebagai gantinya.

Membangun string koneksi

Sekarang Anda dapat membangun string koneksi. Anda akan membuat tabel tujuan dan pemetaan di langkah selanjutnya.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Mengatur informasi file sumber

Atur jalur untuk file sumber. Contoh ini menggunakan file sampel yang dihosting di Azure Blob Storage. Himpunan data sampel StormEvents berisi data terkait cuaca dari Pusat Nasional untuk Informasi Lingkungan.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Membuat tabel di kluster pengujian Anda

Buat tabel bernama StormEvents yang cocok dengan skema data dalam file StormEvents.csv.

Tip

Cuplikan kode berikut membuat instans klien untuk hampir setiap panggilan. Hal ini dilakukan untuk membuat setiap cuplikan secara individual dapat dijalankan. Dalam produksi, instans klien bersifat reentrant, dan harus disimpan selama yang diperlukan. Instans klien tunggal per URI sudah cukup, bahkan ketika bekerja dengan beberapa database (database dapat ditentukan pada tingkat perintah).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Menentukan pemetaan penyerapan

Petakan data CSV yang masuk ke nama kolom yang digunakan saat membuat tabel. Provisikan objek pemetaan kolom CSV pada tabel tersebut.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Menentukan kebijakan batching untuk tabel Anda

Data masuk batching mengoptimalkan ukuran shard data, yang dikontrol oleh kebijakan batching penyerapan. Ubah kebijakan dengan perintah manajemen kebijakan batching penyerapan. Gunakan kebijakan ini untuk mengurangi latensi data yang tiba secara perlahan.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Sebaiknya tetapkan nilai untuk data yang Raw Data Size diserap dan secara bertahap mengurangi ukuran menjadi 250 MB, sambil memeriksa apakah performa meningkat.

Anda dapat menggunakan Flush Immediately properti untuk melewati batching, meskipun ini tidak disarankan untuk penyerapan skala besar karena dapat menyebabkan performa yang buruk.

Mengantrekan pesan untuk penyerapan

Antrekan pesan untuk menarik data dari penyimpanan blob dan menyerap data. Koneksi dibuat ke kluster penyerapan, dan klien lain dibuat untuk bekerja dengan titik akhir tersebut.

Tip

Cuplikan kode berikut membuat instans klien untuk hampir setiap panggilan. Hal ini dilakukan untuk membuat setiap cuplikan secara individual dapat dijalankan. Dalam produksi, instans klien bersifat reentrant, dan harus disimpan selama yang diperlukan. Instans klien tunggal per URI sudah cukup, bahkan ketika bekerja dengan beberapa database (database dapat ditentukan pada tingkat perintah).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Memvalidasi data diserap ke dalam tabel

Tunggu lima hingga sepuluh menit hingga penyerapan antrean menjadwalkan penyerapan dan memuat data ke dalam kluster Anda. Kemudian jalankan kode berikut untuk mendapatkan jumlah rekaman dalam StormEvents tabel.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Menjalankan kueri pemecahan masalah

Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Membersihkan sumber daya

Jika Anda berencana untuk mengikuti artikel kami yang lain, simpan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut dalam database Anda untuk membersihkan StormEvents tabel.

.drop table StormEvents