使用 Azure Data Explorer Go SDK 擷取資料

Azure 資料總管是一項快速又可高度調整的資料探索服務,可用於處理記錄和遙測資料。 它提供Go SDK 用戶端程式庫,以與 Azure Data Explorer 服務互動。 您可以使用Go SDK來擷取、控制及查詢 Azure Data Explorer叢集中的資料。

在本文中,您會先在測試叢集中建立資料表和資料對應。 然後,您會使用 Go SDK 將擷取排入叢集,並驗證結果。

必要條件

安裝 Go SDK

當您執行 [使用Go 模組的範例應用程式] 時,會自動安裝 Azure Data Explorer Go SDK。 如果您為另一個應用程式安裝 Go SDK,請建立 Go 模組並使用) 擷取 Azure Data Explorer 套件 (go get ,例如:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

套件相依性將會新增至 go.mod 檔案。 在 Go 應用程式中使用它。

檢閱程式碼

檢閱程式碼 區段是選擇性的。 如果您有興趣瞭解程式碼的運作方式,您可以檢閱下列程式碼片段。 或是,您可以直接跳到執行應用程式

Authenticate

此程式在執行任何作業之前,必須先向 Azure Data Explorer服務進行驗證。

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

kusto 的實例。授權是使用服務主體認證所建立。 然後,它會用來建立 kusto。用戶端 ,其中包含也接受叢集端點的 New 函式。

建立資料表

create table 命令是由 Kusto 語句表示。 Mgmt 函式可用來執行管理命令。 它用來執行 命令來建立資料表。

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

提示

Kusto 語句預設為常數,以取得更佳的安全性。 NewStmt 接受字串常數。 UnsafeStmtAPI 允許使用非常數語句區段,但不建議使用。

Kusto create table 命令如下所示:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

建立對應

擷取期間會使用資料對應,將傳入的資料對應至 Azure Data Explorer 資料表中的資料行。 如需詳細資訊,請參閱 資料對應。 使用具有資料庫名稱和適當命令的函式, Mgmt 以與資料表相同的方式建立對應。 範例的 GitHub 存放庫中提供完整的命令。

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

內嵌資料

擷取會使用來自現有Azure Blob 儲存體容器的檔案排入佇列。

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

用戶端是使用擷取來建立 的。新增FromFile函式可用來參考Azure Blob 儲存體 URI。 對應參考名稱和資料類型會以 FileOption的形式傳遞。

執行應用程式

  1. 從 GitHub 複製範例程式碼:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. 執行範例程式碼,如 下列程式碼片段 main.go 所示:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    提示

    若要嘗試不同的作業組合,您可以在 中 main.go 取消批註/批註個別函式。

    當您執行範例程式碼時,會執行下列動作:

    1. 卸載資料表StormEvents 如果資料表存在) ,則會將其卸載 (。
    2. 資料表建立StormEvents 建立資料表。
    3. 對應建立StormEvents_CSV_Mapping 建立對應。
    4. 檔案擷取:Azure Blob 儲存體) 中的 CSV 檔案 (已排入佇列以進行擷取。
  3. 若要建立服務主體以進行驗證,請使用 Azure CLI 搭配 az ad sp create-for-rbac 命令。 使用叢集端點和資料庫名稱,以程式將使用的環境變數形式設定服務主體資訊:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. 執行程式:

    go run main.go
    

    您將會收到類似的輸出:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

驗證和疑難排解

等候 5 到 10 分鐘,讓佇列擷取排程擷取程式,並將資料載入 Azure Data Explorer。

  1. 登入 https://dataexplorer.azure.com,並連線至您的叢集。 然後執行下列命令,以取得資料表中的 StormEvents 記錄計數。

    StormEvents | count
    
  2. 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。 先取代資料庫名稱,再執行。

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. 執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。 先取代資料庫名稱,再執行。

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清除資源

如果您打算遵循我們的其他文章,請保留您所建立的資源。 如果沒有,請在資料庫中執行下列命令來卸載 StormEvents 資料表。

.drop table StormEvents

後續步驟