Kusto Java SDK'sını kullanarak veri alma

Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve üst düzeyde ölçeklenebilir veri keşfetme hizmetidir. Java istemci kitaplığı Azure Veri Gezgini kümelerindeki verileri almak, sorun yönetimi komutları ve verileri sorgulamak için kullanılabilir.

Bu makalede, Azure Veri Gezgini Java kitaplığını kullanarak veri almayı öğrenin. İlk olarak, bir test kümesinde bir tablo ve veri eşlemesi oluşturacaksınız. Ardından Java SDK'sını kullanarak blob depolamadan kümeye bir alımı kuyruğa alır ve sonuçları doğrularsınız.

Önkoşullar

Kodu gözden geçirin

Bu bölüm isteğe bağlıdır. Kodun nasıl çalıştığını öğrenmek için aşağıdaki kod parçacıklarını gözden geçirin. Bu bölümü atlamak için uygulamayı çalıştırmaya gidin.

Kimlik Doğrulaması

Program, ConnectionStringBuilder' ile Microsoft Entra kimlik doğrulaması kimlik bilgilerini kullanır.

  1. Sorgu ve yönetim için bir com.microsoft.azure.kusto.data.Client oluşturun.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Azure Veri Gezgini veri alımını kuyruğa almak için bir com.microsoft.azure.kusto.ingest.IngestClient oluşturun ve kullanın:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Yönetim komutları

ve .creategibi .dropyönetim komutları bir com.microsoft.azure.kusto.data.Client nesne üzerinde çağrılarak execute yürütülür.

Örneğin, StormEvents tablo aşağıdaki gibi oluşturulur:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Veri alımı

Mevcut bir Azure Blob Depolama kapsayıcısından dosya kullanarak kuyruk alımı.

  • Blob Depolama yolunu belirtmek için kullanın BlobSourceInfo .
  • Tablo, veritabanı, eşleme adı ve veri türünü tanımlamak için kullanın IngestionProperties . Aşağıdaki örnekte veri türü şeklindedir CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Alma işlemi ayrı bir iş parçacığında başlar ve main iş parçacığı alım iş parçacığının tamamlanmasını bekler. Bu işlemde CountdownLatch kullanılır. Alma API'si (IngestClient#ingestFromBlob) zaman uyumsuz değil. while Döngü, her 5 saniyede bir geçerli durumu yoklama ve alma durumunun farklı Pending bir duruma geçmesini beklemek için kullanılır. Son durum , Failedveya PartiallySucceededolabilirSucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

İpucu

Farklı uygulamalar için alma işlemini zaman uyumsuz olarak işlemek için başka yöntemler de vardır. Örneğin, tablosunu sorgulama veya öğesine bildirilen özel durumları işleme gibi alım sonrası eylemi tanımlayan bir işlem hattı oluşturmak için IngestionStatuskullanabilirsinizCompletableFuture.

Uygulamayı çalıştırma

Genel

Örnek kodu çalıştırdığınızda aşağıdaki eylemler gerçekleştirilir:

  1. Bırakma tablosu: StormEvents tablo bırakılır (varsa).
  2. Tablo oluşturma: StormEvents tablo oluşturulur.
  3. Eşleme oluşturma: StormEvents_CSV_Mapping eşleme oluşturulur.
  4. Dosya alımı: Alma için bir CSV dosyası (Azure Blob Depolama) kuyruğa alındı.

Aşağıdaki örnek koddan alınmalıdır App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

İpucu

farklı işlem bileşimlerini denemek için içindeki ilgili yöntemleri App.javaaçıklamayı kaldırın/açıklamayı kaldırın.

Uygulamayı çalıştırma

  1. GitHub'dan örnek kodu kopyalayın:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Hizmet sorumlusu bilgilerini aşağıdaki bilgilerle birlikte program tarafından kullanılan ortam değişkenleri olarak ayarlayın:

    • Küme uç noktası
    • Veritabanı adı
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Derleme ve çalıştırma:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Çıkış şuna benzer olacaktır:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Alma işleminin tamamlanması için birkaç dakika bekleyin. Başarıyla tamamlandıktan sonra şu günlük iletisini görürsünüz: Ingestion completed successfully. Bu noktada programdan çıkıp, zaten kuyruğa alınmış olan alma işlemini etkilemeden bir sonraki adıma geçebilirsiniz.

Doğrulama

Kuyruğa alınan alımın alma işlemini zamanlaması ve verileri Azure Veri Gezgini'a yüklemesi için beş ile 10 dakika arasında bekleyin.

  1. https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın.

  2. Tablodaki kayıtların StormEvents sayısını almak için aşağıdaki komutu çalıştırın:

    StormEvents | count
    

Sorun giderme

  1. Son dört saat içindeki alma hatalarını görmek için veritabanınızda aşağıdaki komutu çalıştırın:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Son dört saat içindeki tüm alım işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Kaynakları temizleme

Oluşturduğunuz kaynakları kullanmayı planlamıyorsanız, tabloyu bırakmak StormEvents için veritabanınızda aşağıdaki komutu çalıştırın.

.drop table StormEvents