Включение Apache Flink® DataStream в таблицы Azure Databricks Delta Lake
В этом примере показано, как приемник потоковых данных в Azure ADLS 2-го поколения из кластера Apache Flink в HDInsight в AKS в таблицы Delta Lake с помощью автозагрузчика Azure Databricks.
Необходимые компоненты
- Apache Flink 1.17.0 в HDInsight в AKS
- Apache Kafka 3.2 в HDInsight
- Azure Databricks в той же виртуальной сети, что и HDInsight в AKS
- ADLS 2-го поколения и субъект-служба
Автозагрузчик Azure Databricks
Автозагрузчик Databricks упрощает потоковую передачу данных в хранилище объектов из приложений Flink в таблицы Delta Lake. Автозагрузчик предоставляет источник структурированной потоковой передачи, называемый cloudFiles.
Ниже приведены инструкции по использованию данных из Flink в динамических таблицах Azure Databricks delta.
Создание таблицы Apache Kafka® в Apache Flink® SQL
На этом шаге можно создать таблицу Kafka и ADLS 2-го поколения в Flink SQL. В этом документе мы используем .airplanes_state_real_time table
Вы можете использовать любую статью по своему усмотрению.
Необходимо обновить IP-адреса брокера с помощью кластера Kafka в фрагменте кода.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Затем можно создать таблицу ADLSgen2 в Flink SQL.
Обновите имя контейнера и имя учетной записи хранения в фрагменте кода с подробными сведениями ADLS 2-го поколения.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Кроме того, можно вставить таблицу Kafka в таблицу ADLSgen2 в Flink SQL.
Проверка задания потоковой передачи в Flink
Проверьте приемник данных из Kafka в служба хранилища Azure на портал Azure
Проверка подлинности записной книжки служба хранилища Azure и Azure Databricks
ADLS 2-го поколения предоставляет OAuth 2.0 субъекту-службе приложений Microsoft Entra для проверки подлинности из записной книжки Azure Databricks, а затем подключиться к DBFS Azure Databricks.
Давайте получим идентификатор приложения, идентификатор клиента и секретный ключ принципа службы.
Предоставление принципу службы служба хранилища владельца данных BLOB-объектов портал Azure
Подключение ADLS 2-го поколения к DBFS в записной книжке Azure Databricks
Подготовка записной книжки
Давайте напишем следующий код:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Определение конвейера динамической таблицы Delta Live Table и запуск в Azure Databricks
Проверка динамической таблицы Delta в записной книжке Azure Databricks
Справочные материалы
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).