Incorporer Apache Flink® DataStream dans les tables Azure Databricks Delta Lake
Cet exemple montre comment recevoir des données de streaming dans Azure ADLS Gen2 à partir d’un cluster HDInsight sur AKS dans des tables Delta Lake à l’aide d’Azure Databricks Auto Loader.
Prérequis
- Apache Flink 1.17.0 sur HDInsight sur AKS
- Apache Kafka 3.2 sur HDInsight
- Azure Databricks dans le même réseau virtuel que HDInsight sur AKS
- ADLS Gen2 et un principal de service
Azure Databricks Auto Loader
Databricks Auto Loader facilite le streaming de données dans le stockage d’objets à partir d’applications Flink dans des tables Delta Lake. Auto Loader fournit une source Structured Streaming appelée cloudFiles.
Voici les étapes à suivre pour utiliser des données de Flink dans Azure Databricks Delta Live Tables.
Créer une table Apache Kafka® sur Apache Flink® SQL
Lors de cette étape, vous pouvez créer une table Kafka et ADLS Gen2 sur Flink SQL. Dans ce document, nous utilisons un airplanes_state_real_time table
. Vous pouvez utiliser l’article de votre choix.
Vous devez mettre à jour les adresses IP broker avec votre cluster Kafka dans l’extrait de code.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Ensuite, vous pouvez créer une table ADLS Gen2 sur Flink SQL.
Mettez à jour le nom du conteneur et le nom du compte de stockage dans l’extrait de code avec vos détails ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
De plus, vous pouvez insérer une table Kafka dans une table ADLS Gen2 sur Flink SQL.
Valider le travail de streaming sur Flink
Vérifier le récepteur de données à partir de Kafka dans Stockage Azure dans le portail Azure
Authentification de Stockage Azure et de notebook Azure Databricks
ADLS Gen2 fournit OAuth 2.0 avec votre principal de service d’application Microsoft Entra pour l’authentification à partir d’un notebook Azure Databricks, puis le montage dans Azure Databricks DBFS.
Obtenons l’appid du principal de service, l’ID de locataire et la clé secrète.
Accorder au principal de service le rôle Propriétaire des données Blob du stockage dans le portail Azure
Monter ADLS Gen2 dans DBFS, sur un notebook Azure Databricks
Préparer le notebook
Écrivons le code suivant :
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Définir le pipeline Delta Live Table et exécuter sur Azure Databricks
Vérifier Delta Live Table sur le notebook Azure Databricks
Référence
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).