Caricare dati con Delta Live Tables

È possibile caricare dati da qualsiasi origine dati supportata da Apache Spark in Azure Databricks usando Delta Live Tables. È possibile definire set di dati (tabelle e viste) in Delta Live Tables su qualsiasi query che restituisca un DataFrame Spark, inclusi i DataFrame di streaming e Pandas per i DataFrame Spark. Per le attività di inserimento dati, Databricks consiglia di usare le tabelle di streaming per la maggior parte dei casi d'uso. Le tabelle di streaming sono valide per l'inserimento di dati dall'archiviazione di oggetti cloud usando il caricatore automatico o da bus di messaggi come Kafka. Gli esempi seguenti illustrano alcuni modelli comuni.

Importante

Non tutte le origini dati supportano SQL. È possibile combinare notebook SQL e Python in una pipeline Delta Live Tables per usare SQL per tutte le operazioni oltre l'inserimento.

Per informazioni dettagliate sull'uso delle librerie non incluse in tabelle live Delta per impostazione predefinita, vedere Gestire le dipendenze Python per le pipeline di tabelle live Delta.

Caricare file dall'archiviazione di oggetti cloud

Databricks consiglia di usare il caricatore automatico con le tabelle live Delta per la maggior parte delle attività di inserimento dati dall'archiviazione di oggetti cloud. Il caricatore automatico e le tabelle live Delta sono progettati per caricare in modo incrementale e idempotente i dati in continua crescita man mano che arrivano nell'archiviazione cloud. Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:

Nota

Per caricare i file con il Caricatore automatico in una pipeline abilitata per il catalogo Unity, è necessario usare percorsi esterni. Per altre informazioni sull'uso del catalogo Unity con Delta Live Tables, vedere Usare il catalogo Unity con le pipeline Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Vedere Che cos'è il caricatore automatico? e la sintassi SQL del caricatore automatico.

Avviso

Se si usa Il caricatore automatico con le notifiche dei file ed è necessario eseguire un aggiornamento completo per la pipeline o la tabella di streaming, è necessario pulire manualmente le risorse. È possibile usare CloudFilesResourceManager in un notebook per eseguire la pulizia.

Caricare dati da un bus di messaggi

È possibile configurare pipeline di tabelle live Delta per inserire i dati dagli autobus di messaggi con tabelle di streaming. Databricks consiglia di combinare tabelle di streaming con l'esecuzione continua e la scalabilità automatica avanzata per fornire l'inserimento più efficiente per il caricamento a bassa latenza dai bus di messaggi. Vedere Ottimizzare l'utilizzo del cluster delle pipeline di tabelle live Delta con scalabilità automatica avanzata.

Ad esempio, il codice seguente configura una tabella di streaming per inserire dati da Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

È possibile scrivere operazioni downstream in SQL puro per eseguire trasformazioni di streaming su questi dati, come nell'esempio seguente:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Per un esempio di utilizzo di Hub eventi, vedere Usare Hub eventi di Azure come origine dati tabelle live Delta.

Vedere Configurare le origini dati di streaming.

Caricare dati da sistemi esterni

Le tabelle live delta supportano il caricamento di dati da qualsiasi origine dati supportata da Azure Databricks. Vedere Connessione alle origini dati. È anche possibile caricare dati esterni usando Lakehouse Federation per le origini dati supportate. Poiché Lakehouse Federation richiede Databricks Runtime 13.3 LTS o versione successiva, per usare Lakehouse Federation la pipeline deve essere configurata per usare il canale di anteprima.

Alcune origini dati non hanno supporto equivalente in SQL. Se non è possibile usare Lakehouse Federation con una di queste origini dati, è possibile usare un notebook Python autonomo per inserire dati dall'origine. Questo notebook può quindi essere aggiunto come libreria di origine con notebook SQL per creare una pipeline di tabelle live Delta. L'esempio seguente dichiara una vista materializzata per accedere allo stato corrente dei dati in una tabella PostgreSQL remota:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Caricare set di dati statici o di piccole dimensioni dall'archiviazione di oggetti cloud

È possibile caricare set di dati statici o di piccole dimensioni usando la sintassi di caricamento di Apache Spark. Le tabelle live delta supportano tutti i formati di file supportati da Apache Spark in Azure Databricks. Per un elenco completo, vedere Opzioni di formato dati.

Gli esempi seguenti illustrano il caricamento di JSON per creare tabelle Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Nota

Il SELECT * FROM format.`path`; costrutto SQL è comune a tutti gli ambienti SQL in Azure Databricks. È il modello consigliato per l'accesso diretto ai file usando SQL con tabelle live Delta.

Accedere in modo sicuro alle credenziali di archiviazione con segreti in una pipeline

È possibile usare i segreti di Azure Databricks per archiviare credenziali come chiavi di accesso o password. Per configurare il segreto nella pipeline, usare una proprietà Spark nella configurazione del cluster delle impostazioni della pipeline. Vedere Configurare le impostazioni di calcolo.

L'esempio seguente usa un segreto per archiviare una chiave di accesso necessaria per leggere i dati di input da un account di archiviazione di Azure Data Lake Archiviazione Gen2 (ADLS Gen2) usando il caricatore automatico. È possibile usare questo stesso metodo per configurare qualsiasi segreto richiesto dalla pipeline, ad esempio le chiavi AWS per accedere a S3 o la password a un metastore Apache Hive.

Per altre informazioni sull'uso di Azure Data Lake Archiviazione Gen2, vedere Connessione ad Azure Data Lake Archiviazione Gen2 e Archiviazione BLOB.

Nota

È necessario aggiungere il spark.hadoop. prefisso alla spark_conf chiave di configurazione che imposta il valore del segreto.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Sostituzione

  • <storage-account-name> con il nome dell'account di archiviazione DILS Gen2.
  • <scope-name> con il nome dell'ambito del segreto di Azure Databricks.
  • <secret-name> con il nome della chiave contenente la chiave di accesso dell'account di archiviazione di Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Sostituzione

  • <container-name> con il nome del contenitore dell'account di archiviazione di Azure che archivia i dati di input.
  • <storage-account-name> con il nome dell'account di archiviazione DILS Gen2.
  • <path-to-input-dataset> con il percorso del set di dati di input.

Caricare dati da Hub eventi di Azure

Hub eventi di Azure è un servizio di streaming di dati che fornisce un'interfaccia compatibile con Apache Kafka. È possibile usare il connettore Structured Streaming Kafka, incluso nel runtime delle tabelle live Delta, per caricare i messaggi da Hub eventi di Azure. Per altre informazioni sul caricamento e l'elaborazione dei messaggi da Hub eventi di Azure, vedere Usare Hub eventi di Azure come origine dati Delta Live Tables.