使用 Kusto .NET SDK 擷取數據
.NET 有兩個用戶端連結庫:內 嵌連結庫 和 資料庫。 如需 .NET SDK 的詳細資訊,請參閱 關於 .NET SDK。 這些程式庫可讓您將資料內嵌 (載入) 至叢集,並從您的程式碼查詢資料。 在本文中,您會先在測試叢集中建立數據表和數據對應。 然後,您將叢集的擷取排入佇列並驗證結果。
必要條件
- Microsoft 帳戶或 Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
- 叢集和資料庫。 建立叢集和資料庫。
安裝內嵌程式庫
Install-Package Microsoft.Azure.Kusto.Ingest
新增驗證和建構 連接字串
驗證
若要驗證應用程式,SDK 會使用您的 Microsoft Entra 租使用者標識碼。 若要尋找您的租用戶識別碼,請使用下列 URL,並以您的網域取代 YourDomain。
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
例如,如果您的網域為 contoso.com,則 URL 會是:https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按一下此 URL 來查看結果;第一行如下所示。
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
在此案例中,租用戶識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779f
。
此範例會使用互動式 Microsoft Entra 用戶驗證來存取叢集。 您也可以使用 Microsoft Entra 應用程式驗證搭配憑證或應用程式密碼。 在執行此程序代碼之前,請務必為 tenantId
和 clusterUri
設定正確的值。
SDK 提供方便的方式,將驗證方法設定為 連接字串 的一部分。 如需連接字串的完整檔,請參閱 連接字串。
注意
目前的 SDK 版本不支援 .NET Core 上的互動式用戶驗證。 如有需要,請改用 Microsoft Entra 用戶名稱/密碼或應用程式驗證。
建構連接字串
現在您可以建構 連接字串。 您將在稍後的步驟中建立目的地數據表和對應。
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
設定來源檔案資訊
設定來源檔案的路徑。 本範例使用裝載於 Azure Blob 儲存體的範例檔案。 StormEvents 範例數據集包含來自國家環境資訊中心天氣相關數據。
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
在測試叢集上建立資料表
建立名為 StormEvents
的資料表,該資料表與 StormEvents.csv
檔案中的資料結構描述相符。
提示
下列程式碼片段幾乎每次呼叫都會建立用戶端的執行個體。 這是為了讓每個程式碼片段都可個別執行。 在生產環境中,用戶端執行個體是可重新進入的,且應視需要保留。 即使在使用多個資料庫 (也可以在命令層級上指定資料庫) 時,每個 URI 的單一用戶端執行個體即已足夠。
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
定義擷取對應
將傳入的 CSV 資料對應到建立資料表時使用的資料行名稱。 在該資料表上佈建 CSV 資料行對應物件。
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
定義數據表的批處理原則
批處理傳入數據會優化數據分區大小,此大小是由擷 取批處理原則所控制。 使用 擷取批處理原則管理命令來修改原則。 使用此原則可減少緩慢抵達數據的延遲。
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
建議您定義 Raw Data Size
內嵌數據的值,並以累加方式將大小減少到 250 MB,同時檢查效能是否改善。
您可以使用 Flush Immediately
屬性來略過批處理,雖然不建議進行大規模擷取,因為這可能會導致效能不佳。
將訊息排入佇列以供擷取
將訊息排入佇列,以從 Blob 記憶體提取數據並擷取數據。 系統會建立與擷取叢集的連線,並建立另一個客戶端來處理該端點。
提示
下列程式碼片段幾乎每次呼叫都會建立用戶端的執行個體。 這是為了讓每個程式碼片段都可個別執行。 在生產環境中,用戶端執行個體是可重新進入的,且應視需要保留。 即使在使用多個資料庫 (也可以在命令層級上指定資料庫) 時,每個 URI 的單一用戶端執行個體即已足夠。
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
驗證已將資料內嵌至資料表
等候五到十分鐘,讓佇列擷取排程擷取,並將數據載入您的叢集。 然後執行下列程式碼,以取得 StormEvents
資料表中的記錄計數。
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
執行疑難排解查詢
登入 https://dataexplorer.azure.com,並連線至您的叢集。 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。 先取代資料庫名稱,再執行。
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。 先取代資料庫名稱,再執行。
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
清除資源
如果您打算遵循我們的其他文章,請保留您所建立的資源。 否則,請在資料庫中執行下列命令,來清除 StormEvents
資料表。
.drop table StormEvents
相關 contnent
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應