Menyerap data menggunakan Azure Data Explorer Go SDK
Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Ini menyediakan pustaka klien Go SDK untuk berinteraksi dengan layanan Azure Data Explorer. Anda dapat menggunakan Go SDK untuk menyerap, mengontrol, dan mengkueri data di kluster Azure Data Explorer.
Dalam artikel ini, Anda terlebih dahulu membuat tabel dan pemetaan data dalam kluster pengujian. Anda kemudian mengantre penyerapan ke kluster menggunakan Go SDK dan memvalidasi hasilnya.
Prasyarat
- Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
- Kluster dan database Azure Data Explorer. Membuat kluster dan database.
- Instal Git.
- Instal Go dengan persyaratan minimum Go SDK berikut.
- Buat Pendaftaran Aplikasi dan berikan izin ke database. Simpan ID klien dan rahasia klien untuk digunakan nanti.
Menginstal Go SDK
Azure Data Explorer Go SDK akan diinstal secara otomatis saat Anda menjalankan [aplikasi sampel yang menggunakan modul Go. Jika Anda menginstal Go SDK untuk aplikasi lain, buat modul Go dan ambil paket Azure Data Explorer (menggunakan go get
), misalnya:
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
Dependensi paket akan ditambahkan ke go.mod
file. Gunakan di aplikasi Go Anda.
Mengulas kode
Bagian Tinjau kode ini bersifat opsional. Jika Anda tertarik untuk mempelajari cara kerja kode, Anda dapat meninjau cuplikan kode berikut. Jika tidak, Anda dapat melewati ke depan untuk Menjalankan aplikasi.
Mengautentikasi
Program ini perlu mengautentikasi ke layanan Azure Data Explorer sebelum menjalankan operasi apa pun.
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
Contoh kusto. Otorisasi dibuat menggunakan kredensial perwakilan layanan. Kemudian digunakan untuk membuat kusto. Klien dengan fungsi Baru yang juga menerima titik akhir kluster.
Buat tabel
Perintah buat tabel diwakili oleh pernyataan Kusto. Fungsi Mgmt digunakan untuk menjalankan perintah manajemen. Ini digunakan untuk menjalankan perintah untuk membuat tabel.
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
Tip
Pernyataan Kusto konstan, secara default, untuk keamanan yang lebih baik.
NewStmt
menerima konstanta string.
UnsafeStmt
API memungkinkan penggunaan segmen pernyataan non-konstan, tetapi tidak disarankan.
Perintah buat tabel Kusto adalah sebagai berikut:
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
Membuat pemetaan
Pemetaan data digunakan selama penyerapan untuk memetakan data masuk ke kolom di dalam tabel Azure Data Explorer. Untuk informasi selengkapnya, lihat pemetaan data. Pemetaan dibuat, dengan cara yang sama seperti tabel, menggunakan Mgmt
fungsi dengan nama database dan perintah yang sesuai. Perintah lengkap tersedia di repositori GitHub untuk sampel.
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
Serap data
Penyerapan diantrekan menggunakan file dari kontainer Azure Blob Storage yang ada.
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
Klien Penyerapan dibuat menggunakan penyerapan. Baru. Fungsi FromFile digunakan untuk merujuk ke URI Azure Blob Storage. Nama referensi pemetaan dan jenis data diteruskan dalam bentuk FileOption.
Menjalankan aplikasi
Klon kode sampel dari GitHub:
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
Jalankan kode sampel seperti yang terlihat dalam cuplikan ini dari
main.go
:func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
Tip
Untuk mencoba kombinasi operasi yang berbeda, Anda dapat membatalkan komentar/mengomentari fungsi masing-masing di
main.go
.Saat Anda menjalankan kode sampel, tindakan berikut dilakukan:
-
Jatuhkan tabel:
StormEvents
tabel dihilangkan (jika ada). -
Pembuatan tabel:
StormEvents
tabel dibuat. -
Pembuatan pemetaan:
StormEvents_CSV_Mapping
pemetaan dibuat. - Penyerapan file: File CSV (dalam Azure Blob Storage) diantrekan untuk diserap.
-
Jatuhkan tabel:
Untuk membuat perwakilan layanan untuk autentikasi, gunakan Azure CLI dengan perintah az ad sp create-for-rbac . Atur informasi perwakilan layanan dengan titik akhir kluster dan nama database dalam bentuk variabel lingkungan yang akan digunakan oleh program:
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Jalankan program:
go run main.go
Anda akan mendapatkan output serupa:
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
Memvalidasi dan memecahkan masalah
Tunggu selama 5 hingga 10 menit hingga penyerapan antrean menjadwalkan proses penyerapan dan memuat data ke Azure Data Explorer.
Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Kemudian jalankan perintah berikut untuk mendapatkan hitungan rekaman dalam
StormEvents
tabel.StormEvents | count
Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Membersihkan sumber daya
Jika Anda berencana untuk mengikuti artikel kami yang lain, pertahankan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut ini di database Anda untuk menghilangkan StormEvents
tabel.
.drop table StormEvents