Поделиться через


Прием данных с помощью пакета SDK Для Java для Kusto

Обозреватель данных Azure — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Клиентская библиотека Java может использоваться для приема данных, выполнения команд управления и запроса данных в кластерах Azure Data Explorer.

Из этой статьи вы узнаете, как принимать данные с помощью библиотеки Java для Azure Data Explorer. Во-первых, вы создадите таблицу и выполните сопоставление данных в тестовом кластере. Затем вы поместите в очередь прием данных из хранилища BLOB-объектов в кластер с помощью пакета SDK для Java и проверите результаты.

Предварительные требования

Просмотр кода

Этот раздел является необязательным. Ознакомьтесь с приведенными ниже фрагментами кода, чтобы узнать, как работает код. Чтобы пропустить этот раздел, перейдите на страницу Запуск приложения.

Аутентификация

Программа использует Microsoft Entra учетные данные проверки подлинности с Помощью ConnectionStringBuilder'.

  1. Создание com.microsoft.azure.kusto.data.Client для запросов и управления.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Создание и использование com.microsoft.azure.kusto.ingest.IngestClient для постановки приема данных в очередь Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Команды управления

Команды управления, такие как .drop и .create, выполняются путем вызова execute для com.microsoft.azure.kusto.data.Client объекта .

Например, таблица StormEvents создается следующим образом:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Прием данных

Прием данных помещается в очередь с помощью файла из существующего контейнера хранилища Blob-объектов Azure.

  • Используйте BlobSourceInfo, чтобы указать путь к хранилищу BLOB-объектов.
  • Используйте IngestionProperties для определения таблицы, базы данных, имени сопоставления и типа данных. В следующем примере используется тип данных CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Процесс приема данных начинается в отдельном потоке, а поток main ожидает завершения потока приема. В этом процессе используется CountdownLatch. API приема (IngestClient#ingestFromBlob) не является асинхронным. Цикл while используется для опроса текущего состояния через каждые 5 секунд и ожидает перехода приема из состояния Pending в другое состояние. Конечным состоянием может быть Succeeded, Failed или PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Совет

Существуют и другие методы для асинхронной обработки приема данных для различных приложений. Например, можно использовать CompletableFuture для создания конвейера, определяющего действие после приема, например запрос таблицы или обработка исключений, переданных в IngestionStatus.

Запуск приложения

Общие сведения

Когда вы запускаете пример кода, выполняются следующие действия:

  1. Удаление таблицы: StormEvents удаление таблицы (если она существует).
  2. Создание таблицы: StormEvents создание таблицы.
  3. Создание сопоставления: StormEvents_CSV_Mapping создание сопоставления.
  4. Прием файла: CSV-файл (в хранилище Blob-объектов Azure) помещается в очередь для приема данных.

Ниже приведен пример кода из App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Совет

Чтобы попробовать различные комбинации операций, вы можете раскомментировать или прокомментировать соответствующие методы в App.java.

Запуск приложения

  1. Клонируйте образец кода из GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Задайте сведения о субъекте-службе со следующими сведениями в качестве переменных среды, используемых программой:

    • Конечная точка кластера
    • Имя базы данных
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Выполните сборку:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Результат должен быть аналогичен приведенному ниже:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Подождите несколько минут, пока процесс приема данных завершится. После успешного завершения вы увидите следующее сообщение журнала:Ingestion completed successfully. Вы можете выйти из программы на этом этапе и перейти к следующему шагу, не влияя на процесс приема данных, который уже поставлен в очередь.

Проверить

Подождите в пределах 10 минут, пока не будет запланирован прием данных в очереди, и загрузите данные в Azure Data Explorer.

  1. Войдите в https://dataexplorer.azure.com и подключитесь к кластеру.

  2. Выполните следующую команду, чтобы получить количество записей в таблице StormEvents:

    StormEvents | count
    

Диагностика

  1. Чтобы просмотреть ошибки приема данных в течение последних четырех часов, выполните следующую команду в базе данных:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Выполните следующую команду, чтобы узнать состояние всех операций приема данных за последние четыре часа:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Очистка ресурсов

Если вы не планируете использовать созданные ресурсы, выполните следующую команду в базе данных, чтобы удалить таблицу StormEvents.

.drop table StormEvents