使用 Kusto .NET SDK 擷取數據

.NET 有兩個用戶端連結庫:內 嵌連結庫資料庫。 如需 .NET SDK 的詳細資訊,請參閱 關於 .NET SDK。 這些程式庫可讓您將資料內嵌 (載入) 至叢集,並從您的程式碼查詢資料。 在本文中,您會先在測試叢集中建立數據表和數據對應。 然後,您將叢集的擷取排入佇列並驗證結果。

必要條件

  • Microsoft 帳戶或 Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
  • 叢集和資料庫。 建立叢集和資料庫

安裝內嵌程式庫

Install-Package Microsoft.Azure.Kusto.Ingest

新增驗證和建構 連接字串

驗證

若要驗證應用程式,SDK 會使用您的 Microsoft Entra 租使用者標識碼。 若要尋找您的租用戶識別碼,請使用下列 URL,並以您的網域取代 YourDomain

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

例如,如果您的網域為 contoso.com,則 URL 會是:https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按一下此 URL 來查看結果;第一行如下所示。

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在此案例中,租用戶識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779f

此範例會使用互動式 Microsoft Entra 用戶驗證來存取叢集。 您也可以使用 Microsoft Entra 應用程式驗證搭配憑證或應用程式密碼。 在執行此程序代碼之前,請務必為 tenantIdclusterUri 設定正確的值。

SDK 提供方便的方式,將驗證方法設定為 連接字串 的一部分。 如需連接字串的完整檔,請參閱 連接字串

注意

目前的 SDK 版本不支援 .NET Core 上的互動式用戶驗證。 如有需要,請改用 Microsoft Entra 用戶名稱/密碼或應用程式驗證。

建構連接字串

現在您可以建構 連接字串。 您將在稍後的步驟中建立目的地數據表和對應。

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

設定來源檔案資訊

設定來源檔案的路徑。 本範例使用裝載於 Azure Blob 儲存體的範例檔案。 StormEvents 範例數據集包含來自國家環境資訊中心天氣相關數據。

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

在測試叢集上建立資料表

建立名為 StormEvents 的資料表,該資料表與 StormEvents.csv 檔案中的資料結構描述相符。

提示

下列程式碼片段幾乎每次呼叫都會建立用戶端的執行個體。 這是為了讓每個程式碼片段都可個別執行。 在生產環境中,用戶端執行個體是可重新進入的,且應視需要保留。 即使在使用多個資料庫 (也可以在命令層級上指定資料庫) 時,每個 URI 的單一用戶端執行個體即已足夠。

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定義擷取對應

將傳入的 CSV 資料對應到建立資料表時使用的資料行名稱。 在該資料表上佈建 CSV 資料行對應物件

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定義數據表的批處理原則

批處理傳入數據會優化數據分區大小,此大小是由擷 取批處理原則所控制。 使用 擷取批處理原則管理命令來修改原則。 使用此原則可減少緩慢抵達數據的延遲。

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

建議您定義 Raw Data Size 內嵌數據的值,並以累加方式將大小減少到 250 MB,同時檢查效能是否改善。

您可以使用 Flush Immediately 屬性來略過批處理,雖然不建議進行大規模擷取,因為這可能會導致效能不佳。

將訊息排入佇列以供擷取

將訊息排入佇列,以從 Blob 記憶體提取數據並擷取數據。 系統會建立與擷取叢集的連線,並建立另一個客戶端來處理該端點。

提示

下列程式碼片段幾乎每次呼叫都會建立用戶端的執行個體。 這是為了讓每個程式碼片段都可個別執行。 在生產環境中,用戶端執行個體是可重新進入的,且應視需要保留。 即使在使用多個資料庫 (也可以在命令層級上指定資料庫) 時,每個 URI 的單一用戶端執行個體即已足夠。

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

驗證已將資料內嵌至資料表

等候五到十分鐘,讓佇列擷取排程擷取,並將數據載入您的叢集。 然後執行下列程式碼,以取得 StormEvents 資料表中的記錄計數。

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

執行疑難排解查詢

登入 https://dataexplorer.azure.com,並連線至您的叢集。 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。 先取代資料庫名稱,再執行。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。 先取代資料庫名稱,再執行。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清除資源

如果您打算遵循我們的其他文章,請保留您所建立的資源。 否則,請在資料庫中執行下列命令,來清除 StormEvents 資料表。

.drop table StormEvents