Прием данных с помощью Apache Flink в azure Data Explorer

Apache Flink — это платформа и модуль распределенной обработки для вычислений с отслеживанием состояния в неограниченных и ограниченных потоках данных.

Соединитель Flink — это открытый код проект, который может выполняться в любом кластере Flink. Он реализует приемник данных для перемещения данных из кластера Flink. С помощью соединителя для Apache Flink можно создавать быстрые и масштабируемые приложения, предназначенные для сценариев на основе данных, например машинного обучения (ML), извлечения и преобразования и загрузки (ETL) и Log Analytics.

Из этой статьи вы узнаете, как использовать соединитель Flink для отправки данных из Flink в таблицу. Вы создаете сопоставление таблиц и данных, направляете Flink для отправки данных в таблицу, а затем проверяете результаты.

Предварительные требования

Для проектов Flink, использующих Maven для управления зависимостями, интегрируйте приемник Flink Connector Core Для Azure Data Explorer, добавив его в качестве зависимости:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Для проектов, которые не используют Maven для управления зависимостями, клонируйте репозиторий для соединителя Azure Data Explorer для Apache Flink и создайте его локально. Такой подход позволяет вручную добавить соединитель в локальный репозиторий Maven с помощью команды mvn clean install -DskipTests.

Вы можете пройти проверку подлинности из Flink в с помощью приложения Microsoft Entra ID или управляемого удостоверения.

Этот субъект-служба будет удостоверением, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставите этому субъекту-службе разрешения на доступ к ресурсам Kusto.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращенных данных JSON скопируйте appId, passwordи tenant для использования в будущем.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

  1. Предоставьте пользователю приложения разрешения на доступ к базе данных:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Предоставьте приложению разрешения ingestor или admin для таблицы. Необходимые разрешения зависят от выбранного метода записи данных. Для SinkV2 достаточно разрешений ingestor, а для WriteAndSink требуются разрешения администратора.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Дополнительные сведения об авторизации см. в статье Управление доступом на основе ролей Kusto.

Чтобы записать данные из Flink, выполните приведенные далее действия.

  1. Импортируйте необходимые параметры:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Используйте приложение или управляемое удостоверение для проверки подлинности.

    Для проверки подлинности приложения:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Для проверки подлинности управляемого удостоверения:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Настройте параметры приемника, такие как база данных и таблица:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Вы можете добавить дополнительные параметры, как описано в следующей таблице:

    Параметр Описание Значение по умолчанию
    IngestionMappingRef Ссылается на существующее сопоставление приема.
    FlushImmediately Немедленно очищает данные и может привести к проблемам с производительностью. Этот метод не рекомендуется.
    BatchIntervalMs Управляет частотой очистки данных. 30 секунд
    BatchSize Задает размер пакета для буферизации записей перед очисткой. 1000 записей
    ClientBatchSizeLimit Указывает размер агрегированных данных перед приемом в МБ. 300 МБ
    PollForIngestionStatus Если значение равно true, соединитель опрашивает состояние приема после очистки данных. false
    DeliveryGuarantee Определяет семантику гарантии доставки. Чтобы достичь семантики ровно один раз, используйте WriteAheadSink. AT_LEAST_ONCE
  2. Записывайте данные потоковой передачи одним из следующих методов:

    • SinkV2. Это параметр без отслеживания состояния, который очищает данные на контрольной точке, обеспечивая по крайней мере один раз согласованность. Рекомендуется использовать этот параметр для приема больших объемов данных.
    • WriteAheadSink: этот метод выдает данные в KustoSink. Она интегрирована с системой контрольных точек Flink и предоставляет гарантии только один раз. Данные хранятся в AbstractStateBackend и фиксируются только после завершения контрольной точки.

    В следующем примере используется SinkV2. Чтобы использовать WriteAheadSink, используйте buildWriteAheadSink метод вместо build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Полный код должен выглядеть примерно так:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Проверка приема данных

После настройки подключения данные отправляются в таблицу. Вы можете проверить, что данные приняты, выполнив запрос KQL.

  1. Выполните следующий запрос, чтобы убедиться, что данные попадали в таблицу:

    <MyTable>
    | count
    
  2. Выполните следующий запрос, чтобы просмотреть данные:

    <MyTable>
    | take 100