Inserire dati con Kusto Java SDK

Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. La libreria client Java può essere usata per inserire dati, eseguire comandi di gestione dei problemi ed eseguire query sui dati nei cluster Esplora dati di Azure.

Questo articolo illustra come inserire dati usando la libreria Java di Azure Esplora dati. Prima di tutto, si creeranno una tabella e un mapping dei dati in un cluster di test. Si accoderà quindi un inserimento dall'archivio BLOB al cluster usando Java SDK e si convalidano i risultati.

Prerequisiti

Esaminare il codice

Questa sezione è facoltativa. Esaminare i frammenti di codice seguenti per informazioni sul funzionamento del codice. Per ignorare questa sezione, passare all'esecuzione dell'applicazione.

Authentication

Il programma usa Microsoft Entra credenziali di autenticazione con ConnectionStringBuilder'.

  1. Creare un oggetto com.microsoft.azure.kusto.data.Client per query e gestione.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Creare e usare per com.microsoft.azure.kusto.ingest.IngestClient accodare l'inserimento dei dati in Azure Esplora dati:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Comandi di gestione

I comandi di gestione, ad esempio .drop e .create, vengono eseguiti chiamando execute su un com.microsoft.azure.kusto.data.Client oggetto .

Ad esempio, la StormEvents tabella viene creata come segue:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Inserimento dati

Accodamento tramite un file da un contenitore di Archiviazione BLOB di Azure esistente.

  • Usare BlobSourceInfo per specificare il percorso di archiviazione BLOB.
  • Usare IngestionProperties per definire tabella, database, nome di mapping e tipo di dati. Nell'esempio seguente il tipo di dati è CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Il processo di inserimento inizia in un thread separato e il thread attende il main completamento del thread di inserimento. Questo processo usa CountdownLatch. L'API di inserimento (IngestClient#ingestFromBlob) non è asincrona. Un while ciclo viene usato per eseguire il polling dello stato corrente ogni 5 secondi e attende che lo stato di inserimento cambi da Pending a uno stato diverso. Lo stato finale può essere Succeeded, Failedo PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Suggerimento

Esistono altri metodi per gestire l'inserimento in modo asincrono per applicazioni diverse. Ad esempio, è possibile usare CompletableFuture per creare una pipeline che definisce l'azione dopo l'inserimento, ad esempio eseguire una query sulla tabella o gestire le eccezioni segnalate all'oggetto IngestionStatus.

Eseguire l'applicazione

Generale

Quando si esegue il codice di esempio, vengono eseguite le azioni seguenti:

  1. Elimina tabella: StormEvents la tabella viene eliminata (se esistente).
  2. Creazione tabella: StormEvents viene creata una tabella.
  3. Creazione del mapping: StormEvents_CSV_Mapping viene creato il mapping.
  4. Inserimento file: un file CSV (in Archiviazione BLOB di Azure) viene accodato per l'inserimento.

Il codice di esempio seguente è tratto da App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Suggerimento

Per provare diverse combinazioni di operazioni, rimuovere/commentare i rispettivi metodi in App.java.

Eseguire l'applicazione

  1. Clonare il codice di esempio da GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Impostare le informazioni sull'entità servizio con le informazioni seguenti come variabili di ambiente usate dal programma:

    • Endpoint del cluster
    • Nome database
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Compilare ed eseguire:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    L'output sarà analogo al seguente:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Attendere alcuni minuti per il completamento del processo di inserimento. Al termine, verrà visualizzato il messaggio di log seguente: Ingestion completed successfully. A questo punto è possibile uscire dal programma e passare al passaggio successivo senza influire sul processo di inserimento, che è già stato accodato.

Convalida

Attendere da cinque a 10 minuti per consentire all'inserimento in coda di pianificare il processo di inserimento e caricare i dati in Azure Esplora dati.

  1. Accedere al https://dataexplorer.azure.com e connettersi al cluster.

  2. Eseguire il comando seguente per ottenere il numero di record nella StormEvents tabella:

    StormEvents | count
    

Risolvere problemi

  1. Per visualizzare gli errori di inserimento nelle ultime quattro ore, eseguire il comando seguente nel database:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore, eseguire il comando seguente:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Pulire le risorse

Se non si prevede di usare le risorse create, eseguire il comando seguente nel database per eliminare la StormEvents tabella.

.drop table StormEvents