Partager via


Incorporer Apache Flink® DataStream dans les tables Azure Databricks Delta Lake

Cet exemple montre comment recevoir des données de streaming dans Azure ADLS Gen2 à partir d’un cluster HDInsight sur AKS dans des tables Delta Lake à l’aide d’Azure Databricks Auto Loader.

Prérequis

Azure Databricks Auto Loader

Databricks Auto Loader facilite le streaming de données dans le stockage d’objets à partir d’applications Flink dans des tables Delta Lake. Auto Loader fournit une source Structured Streaming appelée cloudFiles.

Voici les étapes à suivre pour utiliser des données de Flink dans Azure Databricks Delta Live Tables.

Lors de cette étape, vous pouvez créer une table Kafka et ADLS Gen2 sur Flink SQL. Dans ce document, nous utilisons un airplanes_state_real_time table. Vous pouvez utiliser l’article de votre choix.

Vous devez mettre à jour les adresses IP broker avec votre cluster Kafka dans l’extrait de code.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Ensuite, vous pouvez créer une table ADLS Gen2 sur Flink SQL.

Mettez à jour le nom du conteneur et le nom du compte de stockage dans l’extrait de code avec vos détails ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

De plus, vous pouvez insérer une table Kafka dans une table ADLS Gen2 sur Flink SQL.

Capture d’écran montrant l’insertion d’une table Kafka dans une table ADLSgen2.

Capture d’écran montrant la validation du travail de streaming sur Flink.

Vérifier le récepteur de données à partir de Kafka dans Stockage Azure dans le portail Azure

Capture d’écran montrant la vérification du récepteur de données à partir de Kafka sur stockage Azure.

Authentification de Stockage Azure et de notebook Azure Databricks

ADLS Gen2 fournit OAuth 2.0 avec votre principal de service d’application Microsoft Entra pour l’authentification à partir d’un notebook Azure Databricks, puis le montage dans Azure Databricks DBFS.

Obtenons l’appid du principal de service, l’ID de locataire et la clé secrète.

Capture d’écran montrant l’obention de l’appid du principe de service, l’ID de locataire et la clé secrète.

Accorder au principal de service le rôle Propriétaire des données Blob du stockage dans le portail Azure

Capture d’écran montrant le principe de service Propriétaire des données Blob du stockage sur le portail Azure.

Monter ADLS Gen2 dans DBFS, sur un notebook Azure Databricks

Capture d’écran montrant le montage d’ADLS Gen2 dans DBFS, sur un notebook Azure Databricks.

Préparer le notebook

Écrivons le code suivant :

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Définir le pipeline Delta Live Table et exécuter sur Azure Databricks

Capture d’écran montrant un pipeline Delta Live Table et son exécution sur Azure Databricks.

Capture d’écran montrant un pipeline Delta Live Table et son exécution sur Azure Databricks.

Vérifier Delta Live Table sur le notebook Azure Databricks

Capture d’écran montrant la vérification de Delta Live Table sur Azure Databricks Notebook.

Référence