Ingestování dat pomocí sady Azure Data Explorer Go SDK
Průzkumník dat Azure je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Poskytuje klientskou knihovnu sady Go SDK pro interakci se službou Azure Data Explorer. Sadu Go SDK můžete použít k ingestování, řízení a dotazování dat v clusterech Azure Data Explorer.
V tomto článku nejprve vytvoříte mapování tabulek a dat v testovacím clusteru. Potom ingestování dat do clusteru za frontu pomocí sady Go SDK a ověříte výsledky.
Požadavky
- Účet Microsoft nebo Microsoft Entra identitu uživatele. Předplatné Azure není povinné.
- Cluster a databáze Azure Data Explorer. Vytvořte cluster a databázi.
- Nainstalujte Git.
- Nainstalujte Go s následujícími minimálními požadavky na sadu Go SDK.
- Vytvořte registraci aplikace a udělte jí oprávnění k databázi. Uložte ID klienta a tajný klíč klienta pro pozdější použití.
Instalace sady Go SDK
Sada Azure Data Explorer Go SDK se automaticky nainstaluje při spuštění [ukázkové aplikace, která používá moduly Go. Pokud jste nainstalovali sadu Go SDK pro jinou aplikaci, vytvořte modul Go a načtěte balíček Azure Data Explorer (pomocí go get
), například:
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
Závislost balíčku se přidá do go.mod
souboru . Použijte ho v aplikaci Go.
Kontrola kódu
Tato část Kontrola kódu je volitelná. Pokud se chcete dozvědět, jak kód funguje, můžete si projít následující fragmenty kódu. V opačném případě můžete přeskočit k části Spuštění aplikace.
Ověření
Program se před provedením jakýchkoli operací musí ověřit ve službě Azure Data Explorer.
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
Instance kusto. Autorizace se vytváří pomocí přihlašovacích údajů instančního objektu. Potom se použije k vytvoření kusto. Klient s funkcí New , která přijímá také koncový bod clusteru.
Vytvoření tabulky
Příkaz create table je reprezentován příkazem Kusto. Funkce Mgmt slouží ke spouštění příkazů pro správu. Slouží ke spuštění příkazu k vytvoření tabulky.
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
Tip
Příkaz Kusto je pro lepší zabezpečení ve výchozím nastavení konstantní.
NewStmt
přijímá řetězcové konstanty. Rozhraní UnsafeStmt
API umožňuje používat segmenty ne constantních příkazů, ale nedoporučuje se.
Příkaz Kusto create table je následující:
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
Vytvoření mapování
Mapování dat se během příjmu dat používá k mapování příchozích dat na sloupce v tabulkách Azure Data Explorer. Další informace najdete v tématu mapování dat. Mapování se vytvoří stejným způsobem jako tabulka pomocí Mgmt
funkce s názvem databáze a příslušným příkazem. Kompletní příkaz je k dispozici v úložišti GitHub pro ukázku.
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
Ingestace dat
Příjem dat je zařazen do fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
Klient příjmu dat se vytvoří pomocí ingestování. Nové. Funkce FromFile slouží k odkaz na identifikátor URI Azure Blob Storage. Název odkazu mapování a datový typ se předávají ve formě FileOption.
Spuštění aplikace
Naklonujte vzorový kód z GitHubu:
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
Spusťte vzorový kód, jak je vidět v tomto fragmentu kódu:
main.go
func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
Tip
Pokud chcete vyzkoušet různé kombinace operací, můžete odkomentovat nebo okomentovat příslušné funkce v
main.go
souboru .Při spuštění ukázkového kódu se provedou následující akce:
-
Přetažení tabulky:
StormEvents
Tabulka se vyřadí (pokud existuje). -
Vytvoření tabulky:
StormEvents
Vytvoří se tabulka. -
Vytvoření mapování:
StormEvents_CSV_Mapping
Vytvoří se mapování. - Příjem souborů: Soubor CSV (v Azure Blob Storage) je zařazen do fronty pro příjem dat.
-
Přetažení tabulky:
Pokud chcete vytvořit instanční objekt pro ověřování, použijte Azure CLI s příkazem az ad sp create-for-rbac . Nastavte informace o instančním objektu pomocí koncového bodu clusteru a názvu databáze ve formě proměnných prostředí, které bude program používat:
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Spusťte program:
go run main.go
Získáte podobný výstup:
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
Ověření a řešení potíží
Počkejte 5 až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Explorer.
Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Potom spuštěním následujícího příkazu získejte počet záznamů v tabulce
StormEvents
.StormEvents | count
Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Vyčištění prostředků
Pokud chcete postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spusťte v databázi následující příkaz, který tabulku vyřadí StormEvents
.
.drop table StormEvents