Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Informazioni su come creare e distribuire una pipeline ETL (estrazione, trasformazione e caricamento) per l'orchestrazione dei dati usando le pipeline dichiarative di Lakeflow e il caricatore automatico. Una pipeline ETL implementa i passaggi per leggere i dati dai sistemi di origine, trasformare tali dati in base ai requisiti, ad esempio i controlli della qualità dei dati e registrare la deduplicazione dei dati e scrivere i dati in un sistema di destinazione, ad esempio un data warehouse o un data lake.
In questa esercitazione si useranno le pipeline dichiarative di Lakeflow e il caricatore automatico per:
- Inserire dati di origine non elaborati in una tabella di destinazione.
- Trasformare i dati di origine non elaborati e scrivere i dati trasformati in due viste materializzate di destinazione.
- Interrogare i dati trasformati.
- Automatizzare la pipeline ETL con un lavoro di Databricks.
Per altre informazioni sulle pipeline dichiarative di Lakeflow e sul caricatore automatico, vedere Pipeline dichiarative di Lakeflow e Che cos'è il caricatore automatico?
Requisiti
Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:
- Accedere a un'area di lavoro di Azure Databricks.
- Assicurati che Unity Catalog sia abilitato per la tua area di lavoro.
- L'abilitazione del calcolo serverless è attiva per il tuo account. Le pipeline dichiarative di Serverless Lakeflow non sono disponibili in tutte le aree dell'area di lavoro. Vedere Funzionalità con disponibilità a livello di area limitata per le aree disponibili.
- Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
- Disporre delle autorizzazioni per creare un nuovo schema in un catalogo. Le autorizzazioni necessarie sono
ALL PRIVILEGES
oUSE CATALOG
eCREATE SCHEMA
. - Disporre delle autorizzazioni per creare un nuovo volume in uno schema esistente. Le autorizzazioni necessarie sono
ALL PRIVILEGES
oUSE SCHEMA
eCREATE VOLUME
.
Informazioni sul set di dati
Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.
Passaggio 1: Creare una pipeline
Per prima cosa, si creerà una pipeline ETL in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines crea pipeline risolvendo le dipendenze definite nei notebook o nei file (denominati codice sorgente) usando la sintassi delle pipeline dichiarative di Lakeflow. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più notebook o file specifici della lingua nella pipeline. Per altre informazioni, vedere Pipeline dichiarative di Lakeflow
Importante
Lasciare vuoto il campo Codice sorgente per creare e configurare un notebook per la creazione automatica del codice sorgente.
Questo tutorial usa il calcolo serverless e Unity Catalog. Per tutte le opzioni di configurazione non specificate, usare le impostazioni predefinite. Se l'ambiente di calcolo serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite. Se si usano le impostazioni di calcolo predefinite, è necessario selezionare manualmente Il catalogo Unity in Opzioni di archiviazione nella sezione Destinazione dell'interfaccia utente crea pipeline .
Per creare una nuova pipeline ETL in Pipeline Dichiarative di Lakeflow, seguire questi passaggi:
- Nell'area di lavoro fare clic
Processi e pipeline nella barra laterale.
- In Nuovo fare clic su Pipeline ETL.
- In il nome pipeline, digita un nome di pipeline univoco.
- Selezionare la casella di controllo Serverless.
- In Destinazione, per configurare un percorso del catalogo Unity in cui vengono pubblicate le tabelle, selezionare un catalogo esistente e scrivere un nuovo nome in Schema per creare un nuovo schema nel catalogo.
- Fare clic su Crea.
Viene visualizzata l'interfaccia utente della pipeline per la nuova pipeline.
Passaggio 2: Sviluppare una pipeline
Importante
I notebook possono contenere solo un singolo linguaggio di programmazione. Non mischiare il codice Python e SQL nei notebook del codice sorgente della pipeline.
In questo passaggio si useranno i notebook di Databricks per sviluppare e convalidare il codice sorgente per le pipeline dichiarative di Lakeflow in modo interattivo.
Il codice usa il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud. Per altre informazioni, vedere Che cos'è il caricatore automatico?
Un notebook di codice sorgente vuoto viene creato e configurato automaticamente per la pipeline. Il notebook viene creato in una nuova directory nella tua directory utente. Il nome della nuova directory e quello del file corrispondono al nome della pipeline. Ad esempio: /Users/someone@example.com/my_pipeline/my_pipeline
.
Quando si sviluppa una pipeline, è possibile scegliere Python o SQL. Sono inclusi esempi per entrambe le lingue. In base alla lingua scelta, verificare di selezionare la lingua predefinita del notebook. Per altre informazioni sul supporto dei notebook per lo sviluppo di codice di Pipeline dichiarative di Lakeflow, vedere Sviluppare ed eseguire il debug di pipeline ETL con un notebook in Pipeline dichiarative di Lakeflow.
Un collegamento per accedere a questo notebook si trova nel campo codice sorgente nel pannello dettagli del pipeline. Fare clic sul collegamento per aprire il notebook prima di procedere al passaggio successivo.
Fare clic su Connetti in alto a destra per aprire il menu di configurazione delle risorse di calcolo.
Passare il puntatore del mouse sul nome della pipeline creata nel passaggio 1.
Fare clic su Connetti.
Accanto al titolo del notebook nella parte superiore selezionare il linguaggio predefinito del notebook (Python o SQL).
Copiare e incollare il codice seguente in una cella del notebook.
Pitone
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Fare clic su Start per avviare un aggiornamento per la pipeline connessa.
Passaggio 3: Eseguire una query sui dati trasformati
In questo passaggio si interrogheranno i dati elaborati nella pipeline di ETL per analizzare i dati relativi al brano. Queste query usano i record preparati creati nel passaggio precedente.
Prima di tutto, eseguire una query che trova gli artisti che hanno pubblicato la maggior parte delle canzoni ogni anno dal 1990.
Nella barra laterale fare clic su
SQL Editor.
Fare clic sull'icona Aggiungi o più della nuova scheda e selezionare Crea nuova query dal menu.
Immetti gli elementi seguenti:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Sostituire
<catalog>
e<schema>
con il nome del catalogo e dello schema in cui si trova la tabella. Ad esempio:data_pipelines.songs_data.top_artists_by_year
.Fare clic su Esegui selezionato.
Ora, esegui un'altra query che trova canzoni con un ritmo 4/4 e un tempo adatto al ballo.
Fare clic
nuovo tocco e selezionare Crea nuova query dal menu.
Immettere il codice seguente:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Sostituire
<catalog>
e<schema>
con il nome del catalogo e dello schema in cui si trova la tabella. Ad esempio:data_pipelines.songs_data.songs_prepared
.Fare clic su Esegui selezionato.
Passaggio 4: Creare un'attività per eseguire la pipeline
Creare quindi un flusso di lavoro per automatizzare i passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Databricks.
- Nell'area di lavoro fare clic
Processi e pipeline nella barra laterale.
- In Nuovo fare clic su Lavoro.
- Nella casella titolo attività, sostituisci Nuovo Lavoro <data e ora> con il tuo nome lavoro. Ad esempio:
Songs workflow
. - In Nome attività immettere un nome per la prima attività,
ETL_songs_data
ad esempio . - In Tipo selezionare Pipeline.
- In Pipeline selezionare la pipeline creata nel passaggio 1.
- Fare clic su Crea.
- Per eseguire il flusso di lavoro, fare clic su Esegui ora. Per visualizzare i dettagli dell'esecuzione, fare clic sulla scheda Esecuzioni. Fare clic sul compito per visualizzare i dettagli dell'esecuzione del compito.
- Per visualizzare i risultati al termine del flusso di lavoro, fare clic su Vai all'ultima esecuzione riuscita o all'ora di inizio per l'esecuzione del processo. Viene visualizzata la pagina Output e vengono visualizzati i risultati della query.
Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.
Passaggio 5: Pianificare l'esecuzione della pipeline
Per eseguire la pipeline ETL in base a una pianificazione, seguire questa procedura:
- Passare all'interfaccia utente Processi e pipeline nella stessa area di lavoro di Azure Databricks del processo.
- Opzionalmente, selezionare i filtri Attività e Di mia proprietà.
- Nella colonna Nome, fare clic sul nome del processo. Nel pannello laterale sono visualizzati i dettagli del lavoro.
- Fare clic su Aggiungi trigger nel pannello Pianificazioni e trigger e selezionare Pianificato in Tipo di trigger.
- Specificare il periodo, l'ora di inizio e il fuso orario.
- Fare clic su Salva.
Altre informazioni
- Per altre informazioni sulle pipeline di elaborazione dati con pipeline dichiarative di Lakeflow, vedere Pipeline dichiarative di Lakeflow
- Per altre informazioni sui notebook di Databricks, vedere Introduzione ai notebook di Databricks.
- Per altre informazioni sui lavori Lakeflow, vedere Che cosa sono i lavori?
- Per altre informazioni su Delta Lake, vedere Che cos'è Delta Lake in Azure Databricks?