Incorporare Apache Flink® DataStream in tabelle Delta Lake di Azure Databricks
Questo esempio illustra come eseguire il sink dei dati di flusso in Azure ADLS Gen2 dal cluster Apache Flink in HDInsight nel servizio Azure Kubernetes in tabelle Delta Lake usando il caricatore automatico di Azure Databricks.
Prerequisiti
- Apache Flink 1.17.0 in HDInsight nel servizio Azure Kubernetes
- Apache Kafka 3.2 in HDInsight
- Azure Databricks nella stessa rete virtuale di HDInsight nel servizio Azure Kubernetes
- ADLS Gen2 e entità servizio
Caricatore automatico di Azure Databricks
Il caricatore automatico di Databricks semplifica lo streaming dei dati nell'archiviazione di oggetti dalle applicazioni Flink alle tabelle Delta Lake. Il caricatore automatico fornisce un'origine structured streaming denominata cloudFiles.
Ecco i passaggi per usare i dati da Flink nelle tabelle live delta di Azure Databricks.
Creare una tabella Apache Kafka® in Apache Flink® SQL
In questo passaggio è possibile creare una tabella Kafka e ADLS Gen2 in Flink SQL. In questo documento si usa un oggetto airplanes_state_real_time table
. È possibile usare qualsiasi articolo di propria scelta.
È necessario aggiornare gli indirizzi IP broker con il cluster Kafka nel frammento di codice.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Successivamente, è possibile creare una tabella ADLSgen2 in Flink SQL.
Aggiornare il nome del contenitore e il nome dell'account di archiviazione nel frammento di codice con i dettagli di ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
È inoltre possibile inserire la tabella Kafka nella tabella ADLSgen2 in Flink SQL.
Convalidare il processo di streaming in Flink
Controllare il sink di dati da Kafka in Archiviazione di Azure in portale di Azure
Autenticazione di Archiviazione di Azure e del notebook di Azure Databricks
ADLS Gen2 fornisce OAuth 2.0 con l'entità servizio dell'applicazione Microsoft Entra per l'autenticazione da un notebook di Azure Databricks e quindi montare in Azure Databricks DBFS.
Ottenere l'appid dell'entità servizio, l'ID tenant e la chiave privata.
Concedere l'entità servizio al proprietario dei dati BLOB Archiviazione in portale di Azure
Montare ADLS Gen2 in DBFS nel notebook di Azure Databricks
Preparare il notebook
Scrivere il codice seguente:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definire la pipeline di tabelle live delta ed eseguirla in Azure Databricks
Controllare la tabella live delta nel notebook di Azure Databricks
Riferimento
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).