Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa esercitazione illustra come creare e distribuire una pipeline ETL (estrazione, trasformazione e caricamento) per l'orchestrazione dei dati usando le pipeline dichiarative di Lakeflow Spark e il caricatore automatico. Una pipeline ETL implementa i passaggi per leggere i dati dai sistemi di origine, trasformare tali dati in base ai requisiti, ad esempio i controlli della qualità dei dati e registrare la deduplicazione dei dati e scrivere i dati in un sistema di destinazione, ad esempio un data warehouse o un data lake.
In questa esercitazione si useranno le pipeline e il caricatore automatico per:
- Inserire dati di origine non elaborati in una tabella di destinazione.
- Trasformare i dati di origine non elaborati e scrivere i dati trasformati in due viste materializzate di destinazione.
- Interrogare i dati trasformati.
- Automatizzare la pipeline ETL con un lavoro di Databricks.
Per altre informazioni sulle pipeline e sul caricatore automatico, vedere Pipeline dichiarative di Lakeflow Spark e Che cos'è il caricatore automatico?
Requisiti
Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:
- Accedere a un'area di lavoro di Azure Databricks.
- Assicurati che Unity Catalog sia abilitato per la tua area di lavoro.
- L'abilitazione del calcolo serverless è attiva per il tuo account. Le pipeline dichiarative di Lakeflow Spark serverless non sono disponibili in tutte le aree dell'area di lavoro. Vedere Funzionalità con disponibilità a livello di area limitata per le aree disponibili.
- Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
- Disporre delle autorizzazioni per creare un nuovo schema in un catalogo. Le autorizzazioni necessarie sono
ALL PRIVILEGESoUSE CATALOGeCREATE SCHEMA. - Disporre delle autorizzazioni per creare un nuovo volume in uno schema esistente. Le autorizzazioni necessarie sono
ALL PRIVILEGESoUSE SCHEMAeCREATE VOLUME.
Informazioni sul set di dati
Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.
Passaggio 1: Creare una pipeline
Creare prima di tutto una pipeline definendo i set di dati nei file (denominato codice sorgente) usando la sintassi della pipeline. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più file specifici della lingua nella pipeline. Per altre informazioni, vedere Pipeline dichiarative di Lakeflow Spark
Questo tutorial usa il calcolo serverless e Unity Catalog. Per tutte le opzioni di configurazione non specificate, usare le impostazioni predefinite. Se l'ambiente di calcolo serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite.
Per creare una nuova pipeline, seguire questa procedura:
- Nell'area di lavoro fare clic
Novità nella barra laterale, quindi selezionare Pipeline ETL.
- Dai alla pipeline un nome univoco.
- Appena sotto il nome, selezionare il catalogo predefinito e lo schema per i dati generati. È possibile specificare altre destinazioni nelle trasformazioni, ma questa esercitazione usa queste impostazioni predefinite. È necessario disporre delle autorizzazioni per il catalogo e lo schema creati. Vedere Requisiti.
- Per questa esercitazione selezionare Inizia con un file vuoto.
- In Percorso cartella specificare un percorso per i file di origine o accettare l'impostazione predefinita (cartella utente).
- Scegliere Python o SQL come linguaggio per il primo file di origine (una pipeline può combinare e trovare le lingue corrispondenti, ma ogni file deve trovarsi in un unico linguaggio).
- Fare clic su Seleziona.
Viene visualizzato l'editor della pipeline per la nuova pipeline. Viene creato un file di origine vuoto per la lingua, pronto per la prima trasformazione.
Passaggio 2: Sviluppare la logica della pipeline
In questo passaggio si userà l'editor di Pipelines Lakeflow per sviluppare e convalidare il codice sorgente per la pipeline in modo interattivo.
Il codice usa il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud. Per altre informazioni, vedere Che cos'è il caricatore automatico?
Un file di codice sorgente vuoto viene creato e configurato automaticamente per la pipeline. Il file viene creato nella cartella trasformazioni della pipeline. Per impostazione predefinita, tutti i file *.py e *.sql nella cartella delle trasformazioni sono inclusi nella sorgente della tua pipeline.
Copiare e incollare il codice seguente nel file di origine. Assicurarsi di usare la lingua selezionata per il file nel passaggio 1.
Pitone
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Questa fonte include il codice per tre interrogazioni. È anche possibile inserire tali query in file separati per organizzare i file e scrivere il codice nel modo preferito.
Fare clic
Eseguire il file o Eseguire la pipeline per avviare l'aggiornamento della pipeline connessa. Con un solo file di origine nella pipeline, questi sono equivalenti a livello funzionale.
Al termine dell'aggiornamento, l'editor viene aggiornato con informazioni sulla pipeline.
- Il grafico della pipeline (DAG), nella barra laterale a destra del codice, mostra tre tabelle,
songs_raw,songs_preparedetop_artists_by_year. - Nella parte superiore del browser degli asset della pipeline viene visualizzato un riepilogo dell'aggiornamento.
- I dettagli delle tabelle generate vengono visualizzati nel riquadro inferiore ed è possibile esplorare i dati dalle tabelle selezionando uno.
Sono inclusi i dati non elaborati e puliti, nonché alcune semplici analisi per trovare i migliori artisti per anno. Nel passaggio successivo si creano query ad hoc per un'ulteriore analisi in un file separato nella pipeline.
Passaggio 3: Esplorare i set di dati creati dalla pipeline
In questo passaggio si eseguono query ad hoc sui dati elaborati nella pipeline ETL per analizzare i dati del brano nell'editor SQL di Databricks. Queste query usano i record preparati creati nel passaggio precedente.
Prima di tutto, eseguire una query che trova gli artisti che hanno pubblicato la maggior parte delle canzoni ogni anno dal 1990.
Nella barra laterale del browser asset della pipeline fare clic
Aggiungere quindi Esplorazione.
Immettere un nome e selezionare SQL per il file di esplorazione. Un notebook SQL viene creato in una nuova
explorationscartella. I file nellaexplorationscartella non vengono eseguiti come parte di un aggiornamento della pipeline per impostazione predefinita. Il notebook SQL include celle che è possibile eseguire insieme o separatamente.Per creare una tabella di artisti che rilasciano la maggior parte dei brani di ogni anno dopo il 1990, immettere il codice seguente nel nuovo file SQL (se nel file è presente codice di esempio, sostituirlo). Poiché questo notebook non fa parte della pipeline, non usa il catalogo e lo schema predefiniti. Sostituire
<catalog>.<schema>con il catalogo e lo schema usati come valori predefiniti per la pipeline.-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Fare clic
premere
Shift + Enterper eseguire questa query.
Ora, esegui un'altra query che trova canzoni con un ritmo 4/4 e un tempo adatto al ballo.
Aggiungere il codice seguente alla cella successiva nello stesso file. Anche in questo caso, sostituire
<catalog>.<schema>con il catalogo e lo schema usati come valori predefiniti per la pipeline:-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Fare clic
premere
Shift + Enterper eseguire questa query.
Passaggio 4: Creare un'attività per eseguire la pipeline
Creare quindi un flusso di lavoro per automatizzare i passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Databricks eseguito in base a una pianificazione.
- Nella parte superiore dell'editor scegliere il pulsante Pianifica .
- Se viene visualizzata la finestra di dialogo Pianificazioni , scegliere Aggiungi pianificazione.
- Verrà visualizzata la finestra di dialogo Nuova pianificazione , in cui è possibile creare un processo per eseguire la pipeline in base a una pianificazione.
- Facoltativamente, dare un nome al job.
- Per impostazione predefinita, la pianificazione è impostata per l'esecuzione una volta al giorno. Puoi accettare questo predefinito o impostare un programma personalizzato. Scegliendo Avanzate è possibile impostare un orario specifico in cui verrà eseguito il processo. Se si seleziona Altre opzioni , è possibile creare notifiche durante l'esecuzione del processo.
- Seleziona Crea per applicare le modifiche e creare l'attività.
Ora l'attività verrà eseguita ogni giorno per mantenere aggiornata la pipeline. È possibile scegliere di nuovo Pianifica per visualizzare l'elenco delle pianificazioni. È possibile gestire le pianificazioni per la pipeline da tale finestra di dialogo, inclusa l'aggiunta, la modifica o la rimozione di pianificazioni.
Facendo clic sul nome del programma (o del processo), si accede alla pagina del processo nell'elenco Processi e pipeline. Da qui è possibile visualizzare i dettagli sulle esecuzioni dei processi, inclusa la cronologia delle esecuzioni o eseguire immediatamente il processo con il pulsante Esegui adesso .
Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.
Altre informazioni
- Per altre informazioni sulle pipeline di elaborazione dati, vedere Pipeline dichiarative di Lakeflow Spark
- Per ulteriori informazioni sui Notebook di Databricks, vedi Notebook di Databricks.
- Per altre informazioni sui lavori Lakeflow, vedere Che cosa sono i lavori?
- Per altre informazioni su Delta Lake, vedere Che cos'è Delta Lake in Azure Databricks?