Esercitazione: Eseguire una pipeline end-to-end lakehouse analitica
Questa esercitazione illustra come configurare una pipeline di analitica end-to-end per una lakehouse di Azure Databricks.
Importante
Questa esercitazione usa notebook interattivi per completare attività ETL comuni in Python nei cluster abilitati per Il catalogo unity. Se non si usa Unity Catalog, vedere Eseguire il primo carico di lavoro ETL in Azure Databricks.
Attività in questa esercitazione
Al termine di questo articolo, ti sentirai a tuo agio:
- Avvio di un cluster di calcolo abilitato per Unity Catalog.
- Creazione di un notebook di Databricks.
- Scrittura e lettura di dati da una posizione esterna del catalogo Unity.
- Configurazione dell'inserimento incrementale dei dati in una tabella del catalogo Unity con il caricatore automatico.
- Esecuzione di celle del notebook per elaborare, eseguire query e visualizzare in anteprima i dati.
- Pianificazione di un notebook come processo di Databricks.
- Esecuzione di query sulle tabelle del catalogo Unity da Databricks SQL
Azure Databricks offre una suite di strumenti pronti per la produzione che consentono ai professionisti dei dati di sviluppare e distribuire rapidamente pipeline di estrazione, trasformazione e caricamento (ETL). Unity Catalog consente agli amministratori dei dati di configurare e proteggere le credenziali di archiviazione, i percorsi esterni e gli oggetti di database per gli utenti in un'organizzazione. Databricks SQL consente agli analisti di eseguire query SQL sulle stesse tabelle usate nei carichi di lavoro ETL di produzione, consentendo funzionalità di business intelligence in tempo reale su larga scala.
È anche possibile usare tabelle live Delta per compilare pipeline ETL. Databricks ha creato tabelle live Delta per ridurre la complessità della compilazione, della distribuzione e della gestione delle pipeline ETL di produzione. Vedere Esercitazione: Eseguire la prima pipeline di tabelle live Delta.
Requisiti
Nota
Se non si dispone dei privilegi di controllo del cluster, è comunque possibile completare la maggior parte dei passaggi seguenti purché si abbia accesso a un cluster.
Passaggio 1: Creare un cluster
Per eseguire l'analisi esplorativa dei dati e la progettazione dei dati, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.
- Fare clic su Calcolo nella barra laterale.
- Fare clic su Nuovo nella barra laterale e quindi selezionare Cluster( Cluster). Verrà visualizzata la pagina Nuovo cluster/calcolo.
- Specificare un nome univoco per il cluster.
- Selezionare il pulsante di opzione Nodo singolo .
- Selezionare Utente singolo dall'elenco a discesa Modalità di accesso.
- Assicurarsi che l'indirizzo di posta elettronica sia visibile nel campo Utente singolo.
- Selezionare la versione desiderata del runtime di Databricks, 11.1 o versione successiva per usare Unity Catalog.
- Fare clic su Crea calcolo per creare il cluster.
Per altre informazioni sui cluster Databricks, vedere Calcolo.
Passaggio 2: Creare un notebook di Databricks
Per creare un notebook nell'area di lavoro, fare clic su Nuovo nella barra laterale e quindi su Notebook. Viene aperto un notebook vuoto nell'area di lavoro.
Per altre informazioni sulla creazione e la gestione dei notebook, vedere Gestire i notebook.
Passaggio 3: Scrivere e leggere dati da una posizione esterna gestita dal catalogo unity
Databricks consiglia di usare il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.
Usare Il catalogo unity per gestire l'accesso sicuro a posizioni esterne. Gli utenti o le entità servizio con READ FILES
autorizzazioni per una posizione esterna possono usare il caricatore automatico per inserire i dati.
In genere, i dati arriveranno in una posizione esterna a causa delle scritture da altri sistemi. In questa demo è possibile simulare l'arrivo dei dati scrivendo file JSON in un percorso esterno.
Copiare il codice seguente in una cella del notebook. Sostituire il valore stringa per catalog
con il nome di un catalogo con CREATE CATALOG
le autorizzazioni e USE CATALOG
. Sostituire il valore stringa per external_location
con il percorso di un percorso esterno con READ FILES
le autorizzazioni , WRITE FILES
e CREATE EXTERNAL TABLE
.
I percorsi esterni possono essere definiti come un intero contenitore di archiviazione, ma spesso puntano a una directory annidata in un contenitore.
Il formato corretto per un percorso esterno è "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
L'esecuzione di questa cella deve stampare una riga che legge 12 byte, stampare la stringa "Hello world!" e visualizzare tutti i database presenti nel catalogo fornito. Se non è possibile eseguire questa cella, verificare di entrare in un'area di lavoro abilitata per Unity Catalog e richiedere le autorizzazioni appropriate all'amministratore dell'area di lavoro per completare questa esercitazione.
Il codice Python seguente usa l'indirizzo di posta elettronica per creare un database univoco nel catalogo fornito e un percorso di archiviazione univoco nella posizione esterna fornita. L'esecuzione di questa cella rimuoverà tutti i dati associati a questa esercitazione, consentendo di eseguire questo esempio in modo idempotente. Viene definita e creata un'istanza di una classe che verrà usata per simulare batch di dati in arrivo da un sistema connesso alla posizione esterna di origine.
Copiare questo codice in una nuova cella del notebook ed eseguirlo per configurare l'ambiente.
Nota
Le variabili definite in questo codice devono consentire di eseguirla in modo sicuro senza rischiare conflitti con gli asset dell'area di lavoro esistenti o altri utenti. Le autorizzazioni di rete o archiviazione limitate genereranno errori durante l'esecuzione di questo codice; contattare l'amministratore dell'area di lavoro per risolvere questi problemi.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
È ora possibile inserire un batch di dati copiando il codice seguente in una cella ed eseguendolo. È possibile eseguire manualmente questa cella fino a 60 volte per attivare l'arrivo di nuovi dati.
RawData.land_batch()
Passaggio 4: Configurare il caricatore automatico per inserire dati in Unity Catalog
Databricks consiglia di archiviare i dati con Delta Lake. Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID e abilita data lakehouse. Delta Lake è il formato predefinito per le tabelle create in Databricks.
Per configurare il caricatore automatico per inserire dati in una tabella di Catalogo Unity, copiare e incollare il codice seguente in una cella vuota del notebook:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Per altre informazioni sul caricatore automatico, vedere Che cos'è il caricatore automatico?.
Per altre informazioni sullo streaming strutturato con il catalogo Unity, vedere Uso del catalogo Unity con Structured Streaming.
Passaggio 5: Elaborare e interagire con i dati
I notebook eseguono la logica cell-by-cell. Usare questi passaggi per eseguire la logica nella cella:
Per eseguire la cella completata nel passaggio precedente, selezionare la cella e premere MAIUSC+INVIO.
Per eseguire una query sulla tabella appena creata, copiare e incollare il codice seguente in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.
df = spark.read.table(table)
Per visualizzare in anteprima i dati nel dataframe, copiare e incollare il codice seguente in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.
display(df)
Per altre informazioni sulle opzioni interattive per la visualizzazione dei dati, vedere Visualizzazioni nei notebook di Databricks.
Passaggio 6: Pianificare un processo
È possibile eseguire notebook di Databricks come script di produzione aggiungendoli come attività in un processo di Databricks. In questo passaggio verrà creato un nuovo processo che è possibile attivare manualmente.
Per pianificare il notebook come attività:
- Fare clic su Pianifica sul lato destro della barra delle intestazioni.
- Immettere un nome univoco per il nome del processo.
- Fare clic su Manuale.
- Nell'elenco a discesa Cluster selezionare il cluster creato nel passaggio 1.
- Cliccare su Crea.
- Nella finestra visualizzata fare clic su Esegui adesso.
- Per visualizzare i risultati dell'esecuzione del processo, fare clic sull'icona accanto al timestamp Ultima esecuzione .
Per altre informazioni sui processi, vedere Che cosa sono i processi di Databricks?.
Passaggio 7: Eseguire query sulla tabella da Databricks SQL
Chiunque disponga dell'autorizzazione USE CATALOG
per il catalogo corrente, l'autorizzazione USE SCHEMA
per lo schema corrente e SELECT
le autorizzazioni per la tabella possono eseguire query sul contenuto della tabella dall'API Databricks preferita.
È necessario accedere a un'istanza di SQL Warehouse in esecuzione per eseguire query in Databricks SQL.
La tabella creata in precedenza in questa esercitazione ha il nome target_table
. È possibile eseguire una query usando il catalogo fornito nella prima cella e il database con il padre e2e_lakehouse_<your-username>
. È possibile usare Esplora cataloghi per trovare gli oggetti dati creati.
Integrazioni aggiuntive
Altre informazioni sulle integrazioni e sugli strumenti per la progettazione dei dati con Azure Databricks: