Интеграция azure Data Обозреватель и Apache Flink®
Azure Data Explorer — это полностью управляемая высокопроизводительная платформа для анализа больших данных, которая позволяет легко анализировать большие объемы данных почти в реальном времени.
ADX помогает пользователям анализировать большие объемы данных из потоковых приложений, веб-сайтов, устройств Интернета вещей и т. д. Интеграция Apache Flink с ADX помогает обрабатывать данные в режиме реального времени и анализировать их в ADX.
Необходимые компоненты
Действия по использованию Azure Data Обозреватель в качестве приемника в Flink
Создание кластера Flink.
Создайте ADX с базой данных и таблицей по мере необходимости.
Добавьте разрешения ingestor для управляемого удостоверения в Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Запустите пример программы, определяющей универсальный код ресурса (универсальный идентификатор ресурса) кластера Kusto, используемую базу данных и управляемое удостоверение, а также таблицу, в которую требуется записать.
Клонируйте проект flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git
Создание таблицы в ADX с помощью следующей команды
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Обновите файл FlinkKustoSinkSample.java с правильным универсальным кодом ресурса (URI) кластера Kusto, базой данных и используемым управляемым удостоверением.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Позже создайте проект с помощью "пакета очистки mvn"
Найдите JAR-файл с именем samples-java-1.0-SNAPSHOT-shaded.jar в папке sample-java/target, а затем отправьте этот JAR-файл в пользовательский интерфейс Flink и отправьте задание.
Запросите таблицу Kusto, чтобы проверить выходные данные
Нет задержки при записи данных в таблицу Kusto из Flink.
Ссылка
- Веб-сайт Apache Flink
- Apache, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).