Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Meer informatie over het maken en implementeren van een ETL-pijplijn (extraheren, transformeren en laden) voor gegevensindeling met behulp van Lakeflow Declarative Pipelines en Auto Loader. Een ETL-pijplijn implementeert de stappen voor het lezen van gegevens uit bronsystemen, het transformeren van die gegevens op basis van vereisten, zoals gegevenskwaliteitscontroles en recordontdubbeling en het schrijven van de gegevens naar een doelsysteem, zoals een datawarehouse of een data lake.
In deze zelfstudie gebruikt u Lakeflow Declarative Pipelines and Auto Loader voor het volgende:
- Onbewerkte brongegevens opnemen in een doeltabel.
- Transformeer de onbewerkte brongegevens en schrijf de getransformeerde gegevens naar twee gerealiseerde doelweergaven.
- Voer een query uit op de getransformeerde gegevens.
- Automatiseer de ETL-pijplijn met een Databricks-taak.
Zie Lakeflow Declarative Pipelines en Wat is Auto Loader? voor meer informatie over Lakeflow Declarative Pipelines en Auto Loader.
Vereisten
Als u deze zelfstudie wilt voltooien, moet u aan de volgende vereisten voldoen:
- Meld u aan bij een Azure Databricks-werkruimte.
- Laat Unity Catalog zijn ingeschakeld voor uw werkruimte.
- Serverloze rekenkracht is ingeschakeld voor uw account. Serverloze Lakeflow-declaratieve pijplijnen zijn niet beschikbaar in alle werkruimteregio's. Zie Functies met beperkte regionale beschikbaarheid voor beschikbare regio's.
- U bent gemachtigd om een rekenresource te maken of toegang te krijgen tot een rekenresource.
- Machtigingen hebben om een nieuw schema in een catalogus te maken. De vereiste machtigingen zijn
ALL PRIVILEGES
ofUSE CATALOG
enCREATE SCHEMA
. - Machtiging hebben om een nieuw volume in een bestaand schema te maken. De vereiste machtigingen zijn
ALL PRIVILEGES
ofUSE SCHEMA
enCREATE VOLUME
.
Over de gegevensset
De gegevensset die in dit voorbeeld wordt gebruikt, is een subset van de Million Song Dataset, een verzameling functies en metagegevens voor hedendaagse muzieknummers. Deze gegevensset is beschikbaar in de voorbeeldgegevenssets die zijn opgenomen in uw Azure Databricks-werkruimte.
stap 1: een pijplijn maken
Eerst maakt u een ETL-pijplijn in Lakeflow-declaratieve pijplijnen. Declaratieve pijplijnen van Lakeflow maken pijplijnen door afhankelijkheden op te lossen die zijn gedefinieerd in notebooks of bestanden ( broncode genoemd) met behulp van de syntaxis van Lakeflow Declarative Pipelines. Elk broncodebestand kan slechts één taal bevatten, maar u kunt meerdere taalspecifieke notebooks of bestanden toevoegen in de pijplijn. Zie Lakeflow Declarative Pipelines voor meer informatie
Belangrijk
Laat het veld Broncode leeg om automatisch een notebook te maken en te configureren voor creatie van broncode.
In deze zelfstudie wordt gebruikgemaakt van serverloze compute en Unity Catalog. Gebruik de standaardinstellingen voor alle configuratieopties die niet zijn opgegeven. Als serverloze berekeningen niet zijn ingeschakeld of ondersteund in uw werkruimte, kunt u de zelfstudie voltooien zoals geschreven met behulp van de standaard-rekeninstellingen. Als u standaard rekeninstellingen gebruikt, moet u Unity Catalog handmatig selecteren onder Opslagopties in de sectie Bestemming van de gebruikersinterface pijplijn maken .
Voer de volgende stappen uit om een nieuwe ETL-pijplijn te maken in Lakeflow Declarative Pipelines:
- Klik in uw werkruimte op
Taken en pijplijnen in de zijbalk.
- Klik onder Nieuw op ETL-pijplijn.
- Typ in pijplijnnaam een unieke pijplijnnaam.
- Schakel het selectievakje Serverloos in.
- Als u in Bestemming een Unity Catalog-locatie wilt configureren waar tabellen worden gepubliceerd, selecteert u een bestaande catalogus en schrijft u een nieuwe naam in schema om een nieuw schema in uw catalogus te maken.
- Klik op Maken.
De gebruikersinterface van de pijplijn wordt weergegeven voor de nieuwe pijplijn.
Stap 2: Een pijplijn ontwikkelen
Belangrijk
Notebooks kunnen slechts één programmeertaal bevatten. Meng geen Python- en SQL-code in notebooks met broncode voor pijplijnen.
In deze stap gebruikt u Databricks Notebooks om interactief broncode te ontwikkelen en te valideren voor Lakeflow-declaratieve pijplijnen.
De code maakt gebruik van automatisch laadprogramma voor incrementele gegevensopname. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten. Voor meer informatie, zie Wat is Auto Loader?
Er wordt automatisch een leeg broncode-notitieboek gemaakt en geconfigureerd voor de pijplijn. Het notitieblok wordt gemaakt in een nieuwe map in uw gebruikersmap. De naam van de nieuwe map en het nieuwe bestand komen overeen met de naam van uw pijplijn. Bijvoorbeeld: /Users/someone@example.com/my_pipeline/my_pipeline
.
Bij het ontwikkelen van een pijplijn kunt u Python of SQL kiezen. Voorbeelden zijn opgenomen voor beide talen. Controleer op basis van uw taalkeuze of u de standaardnotitiebloktaal selecteert. Zie ETL-pijplijnen ontwikkelen en fouten opsporen in ETL-pijplijnen met een notebook in Lakeflow-declaratieve pijplijnen voor meer informatie over notebookondersteuning voor codeontwikkeling van Lakeflow-declaratieve pijplijnen.
Een koppeling voor toegang tot dit notebook bevindt zich onder het veld Broncode in het deelvenster Pijplijndetails . Klik op de koppeling om het notitieblok te openen voordat u doorgaat met de volgende stap.
Klik in de rechterbovenhoek op Verbinden om het configuratiemenu voor berekeningen te openen.
Beweeg de muisaanwijzer over de naam van de pijplijn die u in stap 1 hebt gemaakt.
Klik op Verbinden.
Selecteer naast de titel van uw notitieblok bovenaan de standaardtaal van het notitieblok (Python of SQL).
Kopieer en plak de volgende code in een cel in het notebook.
Python
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Klik op Start om een update voor de verbonden pijplijn te starten.
Stap 3: Een query uitvoeren op de getransformeerde gegevens
In deze stap voert u een query uit op de gegevens die in de ETL-pijplijn worden verwerkt om de nummergegevens te analyseren. Deze query's maken gebruik van de voorbereide records die in de vorige stap zijn gemaakt.
Voer eerst een query uit die de artiesten vindt die elk jaar de meeste nummers hebben uitgebracht sinds 1990.
Klik in de zijbalk op
SQL Editor.
Klik op het
en selecteer Nieuwe query maken in het menu.
Voer het volgende in:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Vervang
<catalog>
en<schema>
door de naam van de catalogus en het schema waarin de tabel zich bevindt. Bijvoorbeeld:data_pipelines.songs_data.top_artists_by_year
.Klik op Selectie uitvoeren.
Voer nu een andere query uit die nummers vindt met een 4/4 maat en een dansbaar tempo.
Klik op het
en selecteer Nieuwe query maken in het menu.
Voer de volgende code in:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Vervang
<catalog>
en<schema>
door de naam van de catalogus en het schema waarin de tabel zich bevindt. Bijvoorbeeld:data_pipelines.songs_data.songs_prepared
.Klik op Selectie uitvoeren.
Stap 4: Een taak maken om de pijplijn uit te voeren
Maak vervolgens een werkstroom om stappen voor gegevensopname, verwerking en analyse te automatiseren met behulp van een Databricks-taak.
- Klik in uw werkruimte op
Taken en pijplijnen in de zijbalk.
- Klik onder Nieuw op Taak.
- Vervang in het vak Taaktitel de datum en tijd< van de nieuwe taak > door de naam van uw taak. Bijvoorbeeld:
Songs workflow
. - Voer in De naam van de taak een naam in voor de eerste taak, bijvoorbeeld
ETL_songs_data
. - Selecteer Pijplijn in Type.
- Selecteer in Pijplijn de pijplijn die u in stap 1 hebt gemaakt.
- Klik op Maken.
- Klik op Nu uitvoeren om de werkstroom uit te voeren. Als u de details van de uitvoering wilt weergeven, klikt u op het tabblad Uitvoeringen . Klik op de taak om details voor de taakuitvoering weer te geven.
- Als u de resultaten wilt weergeven wanneer de werkstroom is voltooid, klikt u op Ga naar de meest recente geslaagde uitvoering of de begintijd voor de taakuitvoering. De uitvoerpagina wordt weergegeven en de queryresultaten worden weergegeven.
Zie Bewaking en waarneembaarheid voor Lakeflow-taken voor meer informatie over taakuitvoeringen.
Stap 5: De pijplijntaak plannen
Voer de volgende stappen uit om de ETL-pijplijn volgens een schema uit te voeren:
- Navigeer naar de Jobs & Pipelines gebruikersinterface in dezelfde Azure Databricks-werkruimte als de job.
- Selecteer desgewenst de filters Werk en Van mij.
- Klik in de kolom Naam op de taaknaam. In het zijpaneel worden de taakgegevens weergegeven.
- Klik op Trigger toevoegen in het deelvenster Planningen en triggers en selecteer Gepland in triggertype.
- Geef de periode, begintijd en tijdzone op.
- Klik op Opslaan.
Meer informatie
- Zie Lakeflow Declarative Pipelines voor meer informatie over pijplijnen voor gegevensverwerking met Lakeflow Declarative Pipelines
- Zie Databricks Notebooks voor meer informatie over Databricks Notebooks.
- Zie Wat zijn taken voor meer informatie over Lakeflow-taken?
- Zie Wat is Delta Lake in Azure Databricks? voor meer informatie over Delta Lake.