Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Informazioni su come creare una nuova pipeline usando le pipeline dichiarative di Lakeflow Spark (SDP) per l'orchestrazione dei dati e il caricatore automatico. Questa esercitazione estende la pipeline di esempio pulendo i dati e creando una query per trovare i primi 100 utenti.
In questa esercitazione si apprenderà come usare l'editor di Pipelines Lakeflow per:
- Creare una nuova pipeline con la struttura di cartelle predefinita e iniziare con un set di file di esempio.
- Definire vincoli di qualità dei dati usando le aspettative.
- Usare le funzionalità dell'editor per estendere la pipeline con una nuova trasformazione per eseguire analisi sui dati.
Requisiti
Prima di iniziare questa esercitazione, è necessario:
- Accedere a un'area di lavoro di Azure Databricks.
- Abilitare Unity Catalog per l'area di lavoro.
- Assicurarsi che l'editor di pipeline Lakeflow sia abilitato per l'area di lavoro e bisogna aver aderito esplicitamente. Vedere Abilitare l'editor delle pipeline di Lakeflow e il monitoraggio aggiornato.
- Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
- Disporre delle autorizzazioni per creare un nuovo schema in un catalogo. Le autorizzazioni necessarie sono
ALL PRIVILEGESoUSE CATALOGeCREATE SCHEMA.
Passaggio 1: creare una pipeline
In questo passaggio si crea una pipeline usando la struttura di cartelle predefinita e gli esempi di codice. Gli esempi di codice fanno riferimento alla tabella users nei dati di esempio wanderbricks.
Nell'area di lavoro di Azure Databricks fare clic
Nuovo, quindi
pipeline ETL. Si apre l'editor della pipeline nella sezione "Crea una pipeline".
Fare clic sull'intestazione per assegnare un nome alla pipeline.
Appena sotto il nome, scegliere il catalogo predefinito e lo schema per le tabelle di output. Questi vengono usati quando non si specifica un catalogo e uno schema nelle definizioni della pipeline.
In Passaggio successivo per la pipeline fare clic
Iniziare con il codice di esempio nell'icona SQL o
Iniziare con il codice di esempio in Python, in base alle preferenze del linguaggio. In questo modo viene modificata la lingua predefinita per il codice di esempio, ma è possibile aggiungere codice nell'altra lingua in un secondo momento. Verrà creata una struttura di cartelle predefinita con codice di esempio per iniziare.
È possibile visualizzare il codice di esempio nel browser asset della pipeline sul lato sinistro dell'area di lavoro. In
transformationssono presenti due file che generano un set di dati della pipeline ciascuno. Sottoexplorationsc'è un notebook con codice che consente di visualizzare l'output della tua pipeline. Facendo clic su un file è possibile visualizzare e modificare il codice nell'editor.I set di dati di output non sono ancora stati creati e il grafico Pipeline a destra della schermata è vuoto.
Per eseguire il codice della pipeline (il codice nella
transformationscartella), fare clic su Esegui pipeline nella parte superiore destra della schermata.Al termine dell'esecuzione, nella parte inferiore dell'area di lavoro vengono visualizzate le due nuove tabelle che sono state create,
sample_users_<pipeline-name>esample_aggregation_<pipeline-name>. È anche possibile osservare che il grafico della Pipeline a destra dell'area di lavoro ora mostra le due tabelle, incluso chesample_usersè l'origine disample_aggregation.
Passaggio 2: Applicare i controlli di qualità dei dati
In questo passaggio si aggiunge un controllo qualità dei dati alla sample_users tabella. Si usano le condizioni della pipeline per vincolare i dati. In questo caso, si eliminano tutti i record utente che non dispongono di un indirizzo di posta elettronica valido e si restituisce la tabella pulita come users_cleaned.
Nel navigatore risorse della pipeline, fare clic
e selezionare Trasformazione.
Nella finestra di dialogo Crea nuovo file di trasformazione effettuare le selezioni seguenti:
- Scegliere Python o SQL per il linguaggio. Non è necessario che corrisponda alla selezione precedente.
- Assegnare un nome al file. In questo caso scegliere
users_cleaned. - Per Percorso di destinazione lasciare il valore predefinito.
- Per Tipo di set di datilasciare selezionato Nessuno oppure scegliere Visualizzazione materializzata. Se si seleziona Visualizzazione materializzata, viene generato automaticamente il codice di esempio.
Nel nuovo file di codice modificare il codice in modo che corrisponda al seguente (usare SQL o Python, in base alla selezione nella schermata precedente). Sostituire
<pipeline-name>con il nome completo per lasample_userstabella.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Pitone
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Fare clic su Esegui pipeline per aggiornare la pipeline. Ora dovrebbe avere tre tabelle.
Passaggio 3: Analizzare gli utenti principali
Ottenere quindi i primi 100 utenti in base al numero di prenotazioni create. Unire la wanderbricks.bookings tabella alla users_cleaned vista materializzata.
Nel navigatore risorse della pipeline, fare clic
e selezionare Trasformazione.
Nella finestra di dialogo Crea nuovo file di trasformazione effettuare le selezioni seguenti:
- Scegliere Python o SQL per il linguaggio. Non è necessario che corrispondano alle selezioni precedenti.
- Assegnare un nome al file. In questo caso scegliere
users_and_bookings. - Per Percorso di destinazione lasciare il valore predefinito.
- Per Tipo di set di datilasciare selezionato Nessuno.
Nel nuovo file di codice modificare il codice in modo che corrisponda al seguente (usare SQL o Python, in base alla selezione nella schermata precedente).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Pitone
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Fare clic su Esegui pipeline per aggiornare i set di dati. Al termine dell'esecuzione, è possibile vedere in Pipeline Graph che sono presenti quattro tabelle, inclusa la nuova
users_and_bookingstabella.
Passaggi successivi
Ora che si è appreso come usare alcune delle funzionalità dell'editor di pipeline Lakeflow e come creare una pipeline, ecco alcune altre funzionalità per altre informazioni:
Strumenti per l'uso e il debug delle trasformazioni durante la creazione di pipeline:
- Esecuzione selettiva
- Anteprime dei dati
- DAG interattivo (grafico dei set di dati nella pipeline)
Integrazione predefinita dei Pacchetti di asset di Databricks per una collaborazione efficiente, il controllo delle versioni e l'integrazione di CI/CD direttamente dall'editor.