Прием данных с помощью Apache Flink в azure Data Explorer
Apache Flink — это платформа и модуль распределенной обработки для вычислений с отслеживанием состояния в неограниченных и ограниченных потоках данных.
Соединитель Flink — это открытый код проект, который может выполняться в любом кластере Flink. Он реализует приемник данных для перемещения данных из кластера Flink. С помощью соединителя для Apache Flink можно создавать быстрые и масштабируемые приложения, предназначенные для сценариев на основе данных, например машинного обучения (ML), извлечения и преобразования и загрузки (ETL) и Log Analytics.
Из этой статьи вы узнаете, как использовать соединитель Flink для отправки данных из Flink в таблицу. Вы создаете сопоставление таблиц и данных, направляете Flink для отправки данных в таблицу, а затем проверяете результаты.
Предварительные требования
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данныхилибазу данных KQL в Real-Time Analytics в Microsoft Fabric.
- Целевая таблица в базе данных. См. статью Создание таблицы в Azure Data Explorer или Создание таблицы в Real-Time Analytics.
- Кластер Apache Flink. Создание кластера.
- Maven 3.x
Получение соединителя Flink
Для проектов Flink, использующих Maven для управления зависимостями, интегрируйте приемник Flink Connector Core Для Azure Data Explorer, добавив его в качестве зависимости:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
Для проектов, которые не используют Maven для управления зависимостями, клонируйте репозиторий для соединителя Azure Data Explorer для Apache Flink и создайте его локально. Такой подход позволяет вручную добавить соединитель в локальный репозиторий Maven с помощью команды mvn clean install -DskipTests
.
Вы можете пройти проверку подлинности из Flink в с помощью приложения Microsoft Entra ID или управляемого удостоверения.
Этот субъект-служба будет удостоверением, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставите этому субъекту-службе разрешения на доступ к ресурсам Kusto.
Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.
az login
Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Создайте субъект-службу. В этом примере принципал службы называется
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Из возвращенных данных JSON скопируйте
appId
,password
иtenant
для использования в будущем.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Вы создали приложение Microsoft Entra и субъект-службу.
Предоставьте пользователю приложения разрешения на доступ к базе данных:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Предоставьте приложению разрешения ingestor или admin для таблицы. Необходимые разрешения зависят от выбранного метода записи данных. Для SinkV2 достаточно разрешений ingestor, а для WriteAndSink требуются разрешения администратора.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Дополнительные сведения об авторизации см. в статье Управление доступом на основе ролей Kusto.
Запись данных из Flink
Чтобы записать данные из Flink, выполните приведенные далее действия.
Импортируйте необходимые параметры:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Используйте приложение или управляемое удостоверение для проверки подлинности.
Для проверки подлинности приложения:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Для проверки подлинности управляемого удостоверения:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Настройте параметры приемника, такие как база данных и таблица:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Вы можете добавить дополнительные параметры, как описано в следующей таблице:
Параметр Описание Значение по умолчанию IngestionMappingRef Ссылается на существующее сопоставление приема. FlushImmediately Немедленно очищает данные и может привести к проблемам с производительностью. Этот метод не рекомендуется. BatchIntervalMs Управляет частотой очистки данных. 30 секунд BatchSize Задает размер пакета для буферизации записей перед очисткой. 1000 записей ClientBatchSizeLimit Указывает размер агрегированных данных перед приемом в МБ. 300 МБ PollForIngestionStatus Если значение равно true, соединитель опрашивает состояние приема после очистки данных. false DeliveryGuarantee Определяет семантику гарантии доставки. Чтобы достичь семантики ровно один раз, используйте WriteAheadSink. AT_LEAST_ONCE Записывайте данные потоковой передачи одним из следующих методов:
- SinkV2. Это параметр без отслеживания состояния, который очищает данные на контрольной точке, обеспечивая по крайней мере один раз согласованность. Рекомендуется использовать этот параметр для приема больших объемов данных.
- WriteAheadSink: этот метод выдает данные в KustoSink. Она интегрирована с системой контрольных точек Flink и предоставляет гарантии только один раз. Данные хранятся в AbstractStateBackend и фиксируются только после завершения контрольной точки.
В следующем примере используется SinkV2. Чтобы использовать WriteAheadSink, используйте
buildWriteAheadSink
метод вместоbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Полный код должен выглядеть примерно так:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Проверка приема данных
После настройки подключения данные отправляются в таблицу. Вы можете проверить, что данные приняты, выполнив запрос KQL.
Выполните следующий запрос, чтобы убедиться, что данные попадали в таблицу:
<MyTable> | count
Выполните следующий запрос, чтобы просмотреть данные:
<MyTable> | take 100
См. также
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по