Sdílet prostřednictvím


Ingestování dat pomocí sady Azure Data Explorer Go SDK

Průzkumník dat Azure je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Poskytuje klientskou knihovnu sady Go SDK pro interakci se službou Azure Data Explorer. Sadu Go SDK můžete použít k ingestování, řízení a dotazování dat v clusterech Azure Data Explorer.

V tomto článku nejprve vytvoříte mapování tabulek a dat v testovacím clusteru. Potom ingestování dat do clusteru za frontu pomocí sady Go SDK a ověříte výsledky.

Požadavky

Instalace sady Go SDK

Sada Azure Data Explorer Go SDK se automaticky nainstaluje při spuštění [ukázkové aplikace, která používá moduly Go. Pokud jste nainstalovali sadu Go SDK pro jinou aplikaci, vytvořte modul Go a načtěte balíček Azure Data Explorer (pomocí go get), například:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Závislost balíčku se přidá do go.mod souboru . Použijte ho v aplikaci Go.

Kontrola kódu

Tato část Kontrola kódu je volitelná. Pokud se chcete dozvědět, jak kód funguje, můžete si projít následující fragmenty kódu. V opačném případě můžete přeskočit k části Spuštění aplikace.

Ověření

Program se před provedením jakýchkoli operací musí ověřit ve službě Azure Data Explorer.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Instance kusto. Autorizace se vytváří pomocí přihlašovacích údajů instančního objektu. Potom se použije k vytvoření kusto. Klient s funkcí New , která přijímá také koncový bod clusteru.

Vytvoření tabulky

Příkaz create table je reprezentován příkazem Kusto. Funkce Mgmt slouží ke spouštění příkazů pro správu. Slouží ke spuštění příkazu k vytvoření tabulky.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Tip

Příkaz Kusto je pro lepší zabezpečení ve výchozím nastavení konstantní. NewStmt přijímá řetězcové konstanty. Rozhraní UnsafeStmt API umožňuje používat segmenty ne constantních příkazů, ale nedoporučuje se.

Příkaz Kusto create table je následující:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Vytvoření mapování

Mapování dat se během příjmu dat používá k mapování příchozích dat na sloupce v tabulkách Azure Data Explorer. Další informace najdete v tématu mapování dat. Mapování se vytvoří stejným způsobem jako tabulka pomocí Mgmt funkce s názvem databáze a příslušným příkazem. Kompletní příkaz je k dispozici v úložišti GitHub pro ukázku.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingestace dat

Příjem dat je zařazen do fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

Klient příjmu dat se vytvoří pomocí ingestování. Nové. Funkce FromFile slouží k odkaz na identifikátor URI Azure Blob Storage. Název odkazu mapování a datový typ se předávají ve formě FileOption.

Spuštění aplikace

  1. Naklonujte vzorový kód z GitHubu:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Spusťte vzorový kód, jak je vidět v tomto fragmentu kódu:main.go

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Tip

    Pokud chcete vyzkoušet různé kombinace operací, můžete odkomentovat nebo okomentovat příslušné funkce v main.gosouboru .

    Při spuštění ukázkového kódu se provedou následující akce:

    1. Přetažení tabulky: StormEvents Tabulka se vyřadí (pokud existuje).
    2. Vytvoření tabulky: StormEvents Vytvoří se tabulka.
    3. Vytvoření mapování: StormEvents_CSV_Mapping Vytvoří se mapování.
    4. Příjem souborů: Soubor CSV (v Azure Blob Storage) je zařazen do fronty pro příjem dat.
  3. Pokud chcete vytvořit instanční objekt pro ověřování, použijte Azure CLI s příkazem az ad sp create-for-rbac . Nastavte informace o instančním objektu pomocí koncového bodu clusteru a názvu databáze ve formě proměnných prostředí, které bude program používat:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Spusťte program:

    go run main.go
    

    Získáte podobný výstup:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Ověření a řešení potíží

Počkejte 5 až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Explorer.

  1. Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Potom spuštěním následujícího příkazu získejte počet záznamů v tabulce StormEvents .

    StormEvents | count
    
  2. Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Vyčištění prostředků

Pokud chcete postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spusťte v databázi následující příkaz, který tabulku vyřadí StormEvents .

.drop table StormEvents

Další krok