Condividi tramite


Incorporare Apache Flink® DataStream in tabelle Delta Lake di Azure Databricks

Questo esempio illustra come eseguire il sink dei dati di flusso in Azure ADLS Gen2 dal cluster Apache Flink in HDInsight nel servizio Azure Kubernetes in tabelle Delta Lake usando il caricatore automatico di Azure Databricks.

Prerequisiti

Caricatore automatico di Azure Databricks

Il caricatore automatico di Databricks semplifica lo streaming dei dati nell'archiviazione di oggetti dalle applicazioni Flink alle tabelle Delta Lake. Il caricatore automatico fornisce un'origine structured streaming denominata cloudFiles.

Ecco i passaggi per usare i dati da Flink nelle tabelle live delta di Azure Databricks.

In questo passaggio è possibile creare una tabella Kafka e ADLS Gen2 in Flink SQL. In questo documento si usa un oggetto airplanes_state_real_time table. È possibile usare qualsiasi articolo di propria scelta.

È necessario aggiornare gli indirizzi IP broker con il cluster Kafka nel frammento di codice.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Successivamente, è possibile creare una tabella ADLSgen2 in Flink SQL.

Aggiornare il nome del contenitore e il nome dell'account di archiviazione nel frammento di codice con i dettagli di ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

È inoltre possibile inserire la tabella Kafka nella tabella ADLSgen2 in Flink SQL.

Screenshot che mostra l'inserimento della tabella Kafka nella tabella ADLSgen2.

Screenshot che mostra la convalida del processo di streaming in Flink.

Controllare il sink di dati da Kafka in Archiviazione di Azure in portale di Azure

Screenshot che mostra il controllo del sink di dati da Kafka in Archiviazione di Azure.

Autenticazione di Archiviazione di Azure e del notebook di Azure Databricks

ADLS Gen2 fornisce OAuth 2.0 con l'entità servizio dell'applicazione Microsoft Entra per l'autenticazione da un notebook di Azure Databricks e quindi montare in Azure Databricks DBFS.

Ottenere l'appid dell'entità servizio, l'ID tenant e la chiave privata.

Screenshot che mostra l'id appid dell'entità servizio, l'ID tenant e la chiave privata.

Concedere l'entità servizio al proprietario dei dati BLOB Archiviazione in portale di Azure

Screenshot che mostra l'entità servizio il proprietario dei dati BLOB Archiviazione in portale di Azure.

Montare ADLS Gen2 in DBFS nel notebook di Azure Databricks

Screenshot che mostra il montaggio di ADLS Gen2 in DBFS nel notebook di Azure Databricks.

Preparare il notebook

Scrivere il codice seguente:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definire la pipeline di tabelle live delta ed eseguirla in Azure Databricks

Screenshot che mostra delta live table pipeline ed esecuzione in Azure Databricks.

Screenshot che mostra la pipeline di tabelle attive Delta ed è in esecuzione in Azure Databricks.

Controllare la tabella live delta nel notebook di Azure Databricks

Screenshot che mostra il controllo Delta Live Table nel notebook di Azure Databricks.

Riferimento