共用方式為


使用 Azure 資料總管 Python 程式庫內嵌資料

在本文中,您會使用 Azure Data Explorer Python 程式庫內嵌資料。 Azure 資料總管是一項快速又可高度調整的資料探索服務,可用於處理記錄和遙測資料。 Azure 資料總管提供兩個適用於 Python 的用戶端程式庫:內嵌程式庫資料程式庫。 這些程式庫可讓您將資料內嵌或載入叢集,並從程式碼查詢資料。

首先,在叢集中建立資料表和資料對應。 然後,您將叢集的擷取排入佇列並驗證結果。

必要條件

  • Microsoft 帳戶或Microsoft Entra使用者身分識別。 不需要 Azure 訂用帳戶。
  • Azure 資料總管叢集和資料庫。 建立叢集和資料庫
  • Python 3.4+

安裝資料並內嵌程式庫

安裝 azure-kusto-dataazure-kusto-ingest

pip install azure-kusto-data
pip install azure-kusto-ingest

新增 Import 陳述式和常數

從 zure-kusto-data 匯入類別。

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

若要驗證應用程式,Azure Data Explorer會使用您的Microsoft Entra租使用者識別碼。 若要尋找您的租使用者識別碼,請使用下列 URL,將網域取代為 YourDomain

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

例如,如果您的網域為 contoso.com,則 URL 會是:https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按一下此 URL 來查看結果;第一行如下所示。

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在此案例中,租用戶識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779f。 先設定 AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI 和 KUSTO_DATABASE 的值,再執行此程式碼。

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

現在來建構連接字串。 下列範例會使用裝置驗證來存取叢集。 您也可以使用受控識別驗證、Microsoft Entra應用程式憑證、Microsoft Entra應用程式金鑰,以及Microsoft Entra使用者和密碼

您可以在稍後的步驟中建立目的地資料表和對應。

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

設定來源檔案資訊

匯入其他類別,並設定資料來源檔案的常數。 本範例使用裝載於 Azure Blob 儲存體的範例檔案。 StormEvents範例資料集包含來自國家環境資訊中心天氣相關資料。

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

在叢集上建立資料表

建立符合 StormEvents.csv 檔案中資料結構描述的資料表。 當此程式碼執行時,它會傳回類似下列訊息的訊息: 若要登入,請使用網頁瀏覽器開啟頁面 https://microsoft.com/devicelogin ,並輸入程式碼F3W4VWZDM進行驗證。 請遵循下列步驟來登入,然後返回執行下一個程式碼區塊。 建立連線的後續程式碼區塊需要您重新登入。

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

定義擷取對應

將傳入的 CSV 資料對應到建立資料表時使用的資料行名稱與資料類型。 這會將來源資料欄位對應至目的地資料表資料行

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

將訊息排入佇列以供擷取

將訊息放入佇列,以從 Blob 儲存體提取資料,並將該資料內嵌至 Azure 資料總管。

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

查詢已內嵌至資料表的資料

等候五到 10 分鐘,讓佇列擷取排程擷取,並將資料載入 Azure Data Explorer。 然後執行下列程式碼,以取得 StormEvents 資料表中的記錄計數。

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

執行疑難排解查詢

登入 https://dataexplorer.azure.com,並連線至您的叢集。 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。 先取代資料庫名稱,再執行。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。 先取代資料庫名稱,再執行。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清除資源

如果您打算遵循我們的其他文章,請保留您所建立的資源。 否則,請在資料庫中執行下列命令,來清除 StormEvents 資料表。

.drop table StormEvents

後續步驟