Eseguire il primo carico di lavoro ETL in Azure Databricks

Informazioni su come usare gli strumenti pronti per la produzione di Azure Databricks per sviluppare e distribuire le prime pipeline di estrazione, trasformazione e caricamento (ETL) per l'orchestrazione dei dati.

Al termine di questo articolo, ti sentirai a tuo agio:

  1. Avvio di un cluster di calcolo all-purpose di Databricks.
  2. Creazione di un notebook di Databricks.
  3. Configurazione dell'inserimento incrementale dei dati in Delta Lake con il caricatore automatico.
  4. Esecuzione di celle del notebook per elaborare, eseguire query e visualizzare in anteprima i dati.
  5. Pianificazione di un notebook come processo di Databricks.

Questa esercitazione usa notebook interattivi per completare attività ETL comuni in Python o Scala.

È anche possibile usare tabelle live Delta per compilare pipeline ETL. Databricks ha creato tabelle live Delta per ridurre la complessità della compilazione, della distribuzione e della gestione delle pipeline ETL di produzione. Vedere Esercitazione: Eseguire la prima pipeline di tabelle live Delta.

È anche possibile usare il provider Databricks Terraform per creare le risorse di questo articolo. Vedere Creare cluster, notebook e processi con Terraform.

Requisiti

Nota

Se non si dispone dei privilegi di controllo del cluster, è comunque possibile completare la maggior parte dei passaggi seguenti purché si abbia accesso a un cluster.

Passaggio 1: Creare un cluster

Per eseguire l'analisi esplorativa dei dati e la progettazione dei dati, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.

  1. Fare clic su icona di calcoloCalcolo nella barra laterale.
  2. Nella pagina Calcolo fare clic su Crea cluster. Verrà visualizzata la pagina Nuovo cluster.
  3. Specificare un nome univoco per il cluster, lasciare i valori rimanenti nello stato predefinito e fare clic su Crea cluster.

Per altre informazioni sui cluster Databricks, vedere Calcolo.

Passaggio 2: Creare un notebook di Databricks

Per iniziare a scrivere ed eseguire codice interattivo in Azure Databricks, creare un notebook.

  1. Fare clic su Nuova iconaNuovo nella barra laterale, quindi su Notebook.
  2. Nella pagina Crea notebook:
    • Specificare un nome univoco per il notebook.
    • Assicurarsi che il linguaggio predefinito sia impostato su Python o Scala.
    • Selezionare il cluster creato nel passaggio 1 nell'elenco a discesa Cluster (Cluster ).
    • Fai clic su Crea.

Un notebook viene aperto con una cella vuota nella parte superiore.

Per altre informazioni sulla creazione e la gestione dei notebook, vedere Gestire i notebook.

Passaggio 3: Configurare il caricatore automatico per inserire dati in Delta Lake

Databricks consiglia di usare il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

Databricks consiglia di archiviare i dati con Delta Lake. Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID e abilita data lakehouse. Delta Lake è il formato predefinito per le tabelle create in Databricks.

Per configurare il caricatore automatico per inserire i dati in una tabella Delta Lake, copiare e incollare il codice seguente nella cella vuota del notebook:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Nota

Le variabili definite in questo codice devono consentire di eseguirla in modo sicuro senza rischiare conflitti con gli asset dell'area di lavoro esistenti o altri utenti. Le autorizzazioni di rete o archiviazione limitate genereranno errori durante l'esecuzione di questo codice; contattare l'amministratore dell'area di lavoro per risolvere questi problemi.

Per altre informazioni sul caricatore automatico, vedere Che cos'è il caricatore automatico?.

Passaggio 4: Elaborare e interagire con i dati

I notebook eseguono la logica cell-by-cell. Per eseguire la logica nella cella:

  1. Per eseguire la cella completata nel passaggio precedente, selezionare la cella e premere MAIUSC+INVIO.

  2. Per eseguire una query sulla tabella appena creata, copiare e incollare il codice seguente in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Per visualizzare in anteprima i dati nel dataframe, copiare e incollare il codice seguente in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    Python

    display(df)
    

    Scala

    display(df)
    

Per altre informazioni sulle opzioni interattive per la visualizzazione dei dati, vedere Visualizzazioni nei notebook di Databricks.

Passaggio 5: Pianificare un processo

È possibile eseguire notebook di Databricks come script di produzione aggiungendoli come attività in un processo di Databricks. In questo passaggio verrà creato un nuovo processo che è possibile attivare manualmente.

Per pianificare il notebook come attività:

  1. Fare clic su Pianifica sul lato destro della barra delle intestazioni.
  2. Immettere un nome univoco per il nome del processo.
  3. Fare clic su Manuale.
  4. Nell'elenco a discesa Cluster selezionare il cluster creato nel passaggio 1.
  5. Fai clic su Crea.
  6. Nella finestra visualizzata fare clic su Esegui adesso.
  7. Per visualizzare i risultati dell'esecuzione del processo, fare clic sull'icona Collegamento esterno accanto al timestamp Ultima esecuzione .

Per altre informazioni sui processi, vedere Che cos'è Processi di Azure Databricks?.

Integrazioni aggiuntive

Altre informazioni sulle integrazioni e sugli strumenti per la progettazione dei dati con Azure Databricks: