Menyerap data menggunakan Azure Data Explorer Go SDK

Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Ini menyediakan pustaka klien Go SDK untuk berinteraksi dengan layanan Azure Data Explorer. Anda dapat menggunakan Go SDK untuk menyerap, mengontrol, dan mengkueri data di kluster Azure Data Explorer.

Dalam artikel ini, Anda terlebih dahulu membuat tabel dan pemetaan data dalam kluster pengujian. Anda kemudian mengantre penyerapan ke kluster menggunakan Go SDK dan memvalidasi hasilnya.

Prasyarat

Menginstal Go SDK

Azure Data Explorer Go SDK akan diinstal secara otomatis saat Anda menjalankan [aplikasi sampel yang menggunakan modul Go. Jika Anda menginstal Go SDK untuk aplikasi lain, buat modul Go dan ambil paket Azure Data Explorer (menggunakan go get), misalnya:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Dependensi paket akan ditambahkan ke go.mod file. Gunakan di aplikasi Go Anda.

Mengulas kode

Bagian Tinjau kode ini bersifat opsional. Jika Anda tertarik untuk mempelajari cara kerja kode, Anda dapat meninjau cuplikan kode berikut. Jika tidak, Anda dapat melewati ke depan untuk Menjalankan aplikasi.

Mengautentikasi

Program ini perlu mengautentikasi ke layanan Azure Data Explorer sebelum menjalankan operasi apa pun.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Contoh kusto. Otorisasi dibuat menggunakan kredensial perwakilan layanan. Kemudian digunakan untuk membuat kusto. Klien dengan fungsi Baru yang juga menerima titik akhir kluster.

Buat tabel

Perintah buat tabel diwakili oleh pernyataan Kusto. Fungsi Mgmt digunakan untuk menjalankan perintah manajemen. Ini digunakan untuk menjalankan perintah untuk membuat tabel.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Tip

Pernyataan Kusto konstan, secara default, untuk keamanan yang lebih baik. NewStmt menerima konstanta string. UnsafeStmt API memungkinkan penggunaan segmen pernyataan non-konstan, tetapi tidak disarankan.

Perintah buat tabel Kusto adalah sebagai berikut:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Membuat pemetaan

Pemetaan data digunakan selama penyerapan untuk memetakan data masuk ke kolom di dalam tabel Azure Data Explorer. Untuk informasi selengkapnya, lihat pemetaan data. Pemetaan dibuat, dengan cara yang sama seperti tabel, menggunakan Mgmt fungsi dengan nama database dan perintah yang sesuai. Perintah lengkap tersedia di repositori GitHub untuk sampel.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Serap data

Penyerapan diantrekan menggunakan file dari kontainer Azure Blob Storage yang ada.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

Klien Penyerapan dibuat menggunakan penyerapan. Baru. Fungsi FromFile digunakan untuk merujuk ke URI Azure Blob Storage. Nama referensi pemetaan dan jenis data diteruskan dalam bentuk FileOption.

Menjalankan aplikasi

  1. Klon kode sampel dari GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Jalankan kode sampel seperti yang terlihat dalam cuplikan ini dari main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Tip

    Untuk mencoba kombinasi operasi yang berbeda, Anda dapat membatalkan komentar/mengomentari fungsi masing-masing di main.go.

    Saat Anda menjalankan kode sampel, tindakan berikut dilakukan:

    1. Jatuhkan tabel: StormEvents tabel dihilangkan (jika ada).
    2. Pembuatan tabel: StormEvents tabel dibuat.
    3. Pembuatan pemetaan: StormEvents_CSV_Mapping pemetaan dibuat.
    4. Penyerapan file: File CSV (dalam Azure Blob Storage) diantrekan untuk diserap.
  3. Untuk membuat perwakilan layanan untuk autentikasi, gunakan Azure CLI dengan perintah az ad sp create-for-rbac . Atur informasi perwakilan layanan dengan titik akhir kluster dan nama database dalam bentuk variabel lingkungan yang akan digunakan oleh program:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Jalankan program:

    go run main.go
    

    Anda akan mendapatkan output serupa:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Memvalidasi dan memecahkan masalah

Tunggu selama 5 hingga 10 menit hingga penyerapan antrean menjadwalkan proses penyerapan dan memuat data ke Azure Data Explorer.

  1. Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Kemudian jalankan perintah berikut untuk mendapatkan hitungan rekaman dalam StormEvents tabel.

    StormEvents | count
    
  2. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Membersihkan sumber daya

Jika Anda berencana untuk mengikuti artikel kami yang lain, pertahankan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut ini di database Anda untuk menghilangkan StormEvents tabel.

.drop table StormEvents

Langkah selanjutnya