Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Meer informatie over het maken van een nieuwe pijplijn met behulp van Lakeflow Spark Declarative Pipelines (SDP) voor gegevensindeling en Automatisch laden. In deze handleiding wordt de voorbeeldpijplijn van gegevens uitgebreid door de gegevens te reinigen en een query te maken om de top 100 gebruikers te vinden.
In deze zelfstudie leert u hoe u de Lakeflow Pipelines Editor gebruikt voor het volgende:
- Maak een nieuwe pijplijn met de standaardmapstructuur en begin met een set voorbeeldbestanden.
- Beperkingen voor gegevenskwaliteit definiëren aan de hand van verwachtingen.
- Gebruik de editorfuncties om de pijplijn uit te breiden met een nieuwe transformatie om analyses uit te voeren op uw gegevens.
Requirements
Voordat u aan deze zelfstudie begint, moet u het volgende doen:
- Meld u aan bij een Azure Databricks-werkruimte.
- Laat Unity Catalog zijn ingeschakeld voor uw werkruimte.
- Zorg ervoor dat de Lakeflow-pijplijneneditor is ingeschakeld voor uw werkruimte en dat u zich heeft aangemeld. Zie De Editor voor Lakeflow Pipelines inschakelen en de bewaking bijwerken.
- U bent gemachtigd om een rekenresource te maken of toegang te krijgen tot een rekenresource.
- Machtigingen hebben om een nieuw schema in een catalogus te maken. De vereiste machtigingen zijn
ALL PRIVILEGESofUSE CATALOGenCREATE SCHEMA.
Stap 1: Een pijplijn maken
In deze stap maakt u een pijplijn met behulp van de standaardmapstructuur en codevoorbeelden. De codevoorbeelden verwijzen naar de users tabel in de wanderbricks voorbeeldgegevensbron.
Klik in uw Azure Databricks-werkruimte op
Nieuw en vervolgens
ETL-pijplijn. Hiermee opent u de pijplijneditor op de pagina Een pijplijn maken.
Klik op de koptekst om uw pijplijn een naam te geven.
Kies net onder de naam de standaardcatalogus en het standaardschema voor uw uitvoertabellen. Deze worden gebruikt wanneer u geen catalogus en schema opgeeft in uw pijplijndefinities.
Klik onder Volgende stap voor uw pijplijn op
Begin met voorbeeldcode in sql - of
Begin met voorbeeldcode in Python, op basis van uw taalvoorkeur. Hierdoor wordt de standaardtaal voor uw voorbeeldcode gewijzigd, maar u kunt later code toevoegen in de andere taal. Hiermee maakt u een standaardmapstructuur met voorbeeldcode om u op weg te helpen.
U kunt de voorbeeldcode bekijken in de browser voor pijplijnassets aan de linkerkant van de werkruimte. Hieronder,
transformationsstaan twee bestanden die elk één pijplijngegevensset genereren. Onderexplorationsis een notebook met code waarmee u de uitvoer van uw pijplijn kunt bekijken. Als u op een bestand klikt, kunt u de code in de editor bekijken en bewerken.De uitvoergegevenssets zijn nog niet gemaakt en de pijplijngrafiek aan de rechterkant van het scherm is leeg.
Als u de pijplijncode (de code in de
transformationsmap) wilt uitvoeren, klikt u in de rechterbovenhoek van het scherm op Pijplijn uitvoeren .Nadat de uitvoering is voltooid, ziet u in het onderste deel van de werkruimte de twee nieuwe tabellen die zijn gemaakt:
sample_users_<pipeline-name>ensample_aggregation_<pipeline-name>. U kunt ook zien dat in de pijplijngrafiek aan de rechterkant van de werkruimte nu de twee tabellen worden weergegeven, waaronder desample_usersbron voorsample_aggregation.
Stap 2: Gegevenskwaliteitscontroles toepassen
In deze stap voegt u een gegevenskwaliteitscontrole toe aan de sample_users tabel. U gebruikt pijplijnwachtingen om de gegevens te beperken. In dit geval verwijdert u alle gebruikersrecords die geen geldig e-mailadres hebben en voert u de opgeschoonde tabel uit als users_cleaned.
Klik in de browser voor pijplijnasset op
en selecteer Transformatie.
Maak in het dialoogvenster Nieuw transformatiebestand maken de volgende selecties:
- Kies Python of SQL voor de taal. Dit hoeft niet overeen te komen met uw vorige selectie.
- Geef het bestand een naam. Kies
users_cleanedin dit geval . - Laat voor het doelpad de standaardwaarde staan.
- Voor het type gegevensset laat u het op Geen geselecteerd staan of kiest u gematerialiseerde weergave. Als u gerealiseerde weergave selecteert, wordt voorbeeldcode voor u gegenereerd.
Bewerk de code in het nieuwe codebestand zodat deze overeenkomt met het volgende (gebruik SQL of Python op basis van uw selectie op het vorige scherm). Vervang
<pipeline-name>door de volledige naam voor desample_userstabel.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Python
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Klik op Pijplijn uitvoeren om de pijplijn bij te werken. Er moeten nu drie tabellen zijn.
Stap 3: De belangrijkste gebruikers analyseren
Haal vervolgens de top 100 gebruikers op aan de hand van het aantal boekingen dat zij hebben gemaakt. Voeg de wanderbricks.bookings tabel toe aan de users_cleaned gerealiseerde weergave.
Klik in de browser voor pijplijnasset op
en selecteer Transformatie.
Maak in het dialoogvenster Nieuw transformatiebestand maken de volgende selecties:
- Kies Python of SQL voor de taal. Dit hoeft niet overeen te komen met uw vorige selecties.
- Geef het bestand een naam. Kies
users_and_bookingsin dit geval . - Laat voor het doelpad de standaardwaarde staan.
- Voor het type gegevensset laat u het op Geen geselecteerd.
Bewerk de code in het nieuwe codebestand zodat deze overeenkomt met het volgende (gebruik SQL of Python op basis van uw selectie op het vorige scherm).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Python
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Klik op Pijplijn uitvoeren om de gegevenssets bij te werken. Wanneer de uitvoering is voltooid, ziet u in de pijplijngrafiek dat er vier tabellen zijn, inclusief de nieuwe
users_and_bookingstabel.
Volgende stappen
Nu u hebt geleerd hoe u enkele van de functies van de Lakeflow-pijplijneditor kunt gebruiken en een pijplijn kunt maken, vindt u hier enkele andere functies voor meer informatie over:
Hulpprogramma's voor het werken met transformaties en foutopsporing tijdens het maken van pijplijnen:
- Selectieve uitvoering
- Voorbeeldweergaven van gegevens
- Interactieve DAG (grafiek van de gegevenssets in uw pijplijn)
Ingebouwde Integratie van Databricks Asset Bundles voor efficiënte samenwerking, versiebeheer en CI/CD-integratie rechtstreeks vanuit de editor: