Bagikan melalui


Menyerap data menggunakan Kusto Java SDK

Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Pustaka klien Java dapat digunakan untuk menyerap data, perintah manajemen masalah, dan data kueri di kluster Azure Data Explorer.

Dalam artikel ini, pelajari cara menyerap data menggunakan pustaka Azure Data Explorer Java. Pertama, Anda akan membuat tabel dan pemetaan data dalam kluster pengujian. Kemudian Anda akan mengantrekan penyerapan dari penyimpanan blob ke kluster menggunakan Java SDK dan memvalidasi hasilnya.

Prasyarat

Mengulas kode

Bagian ini bersifat opsional. Tinjau cuplikan kode berikut untuk mempelajari cara kerja kode. Untuk melewati bagian ini, buka menjalankan aplikasi.

Autentikasi

Program ini menggunakan kredensial autentikasi Microsoft Entra dengan ConnectionStringBuilder'.

  1. Buat com.microsoft.azure.kusto.data.Client untuk kueri dan manajemen.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Buat dan gunakan untuk mengantrekan com.microsoft.azure.kusto.ingest.IngestClient penyerapan data ke Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Perintah manajemen

Perintah manajemen, seperti .drop dan .create, dijalankan dengan memanggil execute objek com.microsoft.azure.kusto.data.Client .

Misalnya, StormEvents tabel dibuat sebagai berikut:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Penyerapan data

Penyerapan antrean dengan menggunakan file dari kontainer Azure Blob Storage yang ada.

  • Gunakan BlobSourceInfo untuk menentukan jalur Blob Storage.
  • Gunakan IngestionProperties untuk menentukan tabel, database, nama pemetaan, dan jenis data. Dalam contoh berikut, jenis datanya adalah CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Proses penyerapan dimulai dalam utas terpisah dan main utas menunggu utas penyerapan selesai. Proses ini menggunakan CountdownLatch. API penyerapan (IngestClient#ingestFromBlob) tidak asinkron. Perulangan while digunakan untuk melakukan polling status saat ini setiap 5 detik dan menunggu status penyerapan berubah dari Pending ke status yang berbeda. Status akhir dapat berupa Succeeded, Failed, atau PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tip

Ada metode lain untuk menangani penyerapan secara asinkron untuk aplikasi yang berbeda. Misalnya, Anda dapat menggunakan CompletableFuture untuk membuat alur yang menentukan tindakan pasca-penyerapan, seperti mengkueri tabel, atau menangani pengecualian yang dilaporkan ke IngestionStatus.

Menjalankan aplikasi

Umum

Saat Anda menjalankan kode sampel, tindakan berikut dilakukan:

  1. Jatuhkan tabel: StormEvents tabel dihilangkan (jika ada).
  2. Pembuatan tabel: StormEvents tabel dibuat.
  3. Pembuatan pemetaan: StormEvents_CSV_Mapping pemetaan dibuat.
  4. Penyerapan file: File CSV (dalam Azure Blob Storage) diantrekan untuk diserap.

Contoh kode berikut berasal dari App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tip

Untuk mencoba kombinasi operasi yang berbeda, batalkan komentar/komentari metode masing-masing di App.java.

Menjalankan aplikasi

  1. Klon kode sampel dari GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Atur informasi perwakilan layanan dengan informasi berikut sebagai variabel lingkungan yang digunakan oleh program:

    • Titik akhir kluster
    • Nama database
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Bangun dan jalankan:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Outputnya akan mirip dengan:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Tunggu beberapa menit hingga proses penyerapan selesai. Setelah berhasil diselesaikan, Anda akan melihat pesan log berikut: Ingestion completed successfully. Anda dapat keluar dari program pada saat ini dan pindah ke langkah berikutnya tanpa memengaruhi proses penyerapan, yang telah diantrekan.

Memvalidasi

Tunggu lima hingga 10 menit hingga penyerapan antrean menjadwalkan proses penyerapan dan memuat data ke Azure Data Explorer.

  1. Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda.

  2. Jalankan perintah berikut untuk mendapatkan hitungan rekaman dalam StormEvents tabel:

    StormEvents | count
    

Pecahkan masalah

  1. Untuk melihat kegagalan penyerapan dalam empat jam terakhir, jalankan perintah berikut pada database Anda:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Untuk melihat status semua operasi penyerapan dalam empat jam terakhir, jalankan perintah berikut:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Membersihkan sumber daya

Jika Anda tidak berencana menggunakan sumber daya yang telah Anda buat, jalankan perintah berikut ini di database Anda untuk menghilangkan StormEvents tabel.

.drop table StormEvents