Sdílet prostřednictvím


Ingestování dat pomocí sady Kusto Java SDK

Průzkumník dat Azure je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Klientskou knihovnu Java je možné použít k ingestování dat, příkazům pro správu problémů a dotazování dat v clusterech Azure Data Explorer.

V tomto článku se dozvíte, jak ingestovat data pomocí knihovny Azure Data Explorer Java. Nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak pomocí sady Java SDK za frontu zasadíte příjem dat z úložiště objektů blob do clusteru a ověříte výsledky.

Požadavky

Kontrola kódu

Tato část je volitelná. Projděte si následující fragmenty kódu a zjistěte, jak kód funguje. Pokud chcete tuto část přeskočit, přejděte ke spuštění aplikace.

Authentication

Program používá Microsoft Entra přihlašovací údaje ověřování s ConnectionStringBuilder'.

  1. Vytvořte pro com.microsoft.azure.kusto.data.Client dotazy a správu.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Vytvořte a použijte k frontě com.microsoft.azure.kusto.ingest.IngestClient příjmu dat do Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Příkazy pro správu

Příkazy pro správu, například .drop a .create, se spouští voláním execute objektu com.microsoft.azure.kusto.data.Client .

Například tabulka se StormEvents vytvoří takto:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Přijímání dat

Příjem fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.

  • Použijte BlobSourceInfo k určení cesty ke službě Blob Storage.
  • Slouží IngestionProperties k definování tabulky, databáze, názvu mapování a datového typu. V následujícím příkladu je CSVdatový typ .
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Proces příjmu dat začíná v samostatném vlákně main a vlákno čeká na dokončení vlákna příjmu dat. Tento proces používá CountdownLatch. Rozhraní API pro příjem dat (IngestClient#ingestFromBlob) není asynchronní. Smyčka while se používá k dotazování aktuálního stavu každých 5 sekund a čeká, až se stav příjmu dat změní Pending na jiný. Konečný stav může být Succeeded, Failednebo PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tip

Existují i jiné metody, které zpracovávají příjem asynchronně pro různé aplikace. Můžete například vytvořit CompletableFuture kanál definující akci po příjmu dat, jako je dotaz na tabulku nebo zpracování výjimek, které byly nahlášeny do IngestionStatus.

Spuštění aplikace

Obecné

Při spuštění ukázkového kódu se provedou následující akce:

  1. Rozevírací tabulka: StormEvents Tabulka se zahodí (pokud existuje).
  2. Vytvoření tabulky: StormEvents Vytvoří se tabulka.
  3. Vytvoření mapování: StormEvents_CSV_Mapping Vytvoří se mapování.
  4. Příjem souborů: Soubor CSV (v Azure Blob Storage) je zařazen do fronty pro příjem dat.

Následující ukázkový kód je z App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tip

Pokud chcete vyzkoušet různé kombinace operací, odkomentujte nebo okomentujte příslušné metody v nástroji App.java.

Spuštění aplikace

  1. Naklonujte ukázkový kód z GitHubu:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Nastavte informace o instančním objektu s následujícími informacemi jako proměnné prostředí používané programem:

    • Koncový bod clusteru
    • Název databáze
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Sestavte a spusťte:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Výstup bude podobný následujícímu:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Počkejte několik minut, než se proces příjmu dat dokončí. Po úspěšném dokončení se zobrazí následující zpráva protokolu: Ingestion completed successfully. V tomto okamžiku můžete program ukončit a přejít k dalšímu kroku, aniž by to mělo vliv na proces příjmu dat, který už byl zařazen do fronty.

Ověření

Počkejte pět až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Explorer.

  1. Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru.

  2. Spuštěním následujícího příkazu získejte počet záznamů v tabulce StormEvents :

    StormEvents | count
    

Řešení potíží

  1. Pokud chcete zobrazit chyby příjmu dat za poslední čtyři hodiny, spusťte ve své databázi následující příkaz:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Pokud chcete zobrazit stav všech operací příjmu dat za poslední čtyři hodiny, spusťte následující příkaz:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Vyčištění prostředků

Pokud nemáte v úmyslu používat vytvořené prostředky, spusťte v databázi následující příkaz, kterým tabulku vyřadíte StormEvents .

.drop table StormEvents