Delen via


Lakehouse-zelfstudie: Gegevens voorbereiden en transformeren in lakehouse

In deze handleiding gebruikt u notebooks met Spark runtime om onbewerkte gegevens in uw lakehouse te transformeren en gereedmaken.

Vereisten

Als u geen lakehouse hebt dat gegevens bevat, moet u het volgende doen:

Gegevens voorbereiden

Uit de vorige zelfstudie stappen zijn onbewerkte gegevens ingevoerd uit de bron naar de sectie Bestanden van het lakehouse. U kunt deze gegevens nu transformeren en voorbereiden voor het maken van Delta-tabellen.

  1. Download de notebooks uit de map Lakehouse-zelfstudie Broncode.

  2. In de werkruimte selecteer importeren>Notebook>vanaf deze computer.

  3. Selecteer Notitieblok importeren in de Nieuwe sectie bovenaan de landingspagina.

  4. Selecteer Upload vanuit het deelvenster Importstatus dat aan de rechterkant van het scherm wordt geopend.

  5. Selecteer alle notitieblokken die u in de eerste stap van deze sectie hebt gedownload.

    Schermopname die laat zien waar u de gedownloade notitieblokken en de knop Openen kunt vinden.

  6. Selecteer Openen. Er wordt een melding weergegeven die de status van de import aangeeft in de rechterbovenhoek van het browservenster.

  7. Nadat het importeren is voltooid, gaat u naar de itemsweergave van de werkruimte en ziet u de zojuist geïmporteerde notitieblokken. Selecteer wwilakehouse lakehouse om het te openen.

    Screenshot van de lijst met geïmporteerde notitieblokken en de plek waar je het lakehouse kunt selecteren.

  8. Zodra het wwilakehouse Lakehouse is geopend, selecteert u Notitieblok openen>Bestaand notitieblok in het bovenste navigatiemenu.

    Schermopname van de lijst met succesvol geïmporteerde notitieblokken.

  9. Selecteer in de lijst met bestaande notitieblokken het notitieblok 01 - Create Delta Tables en selecteer Open.

  10. In het geopende notitieblok in Lakehouse Explorer ziet u dat het notitieblok al is gekoppeld aan uw geopende lakehouse.

    Notitie

    Fabric biedt de V-ordermogelijkheid voor het schrijven van geoptimaliseerde Delta Lake-bestanden. V-volgorde verbetert vaak compressie met drie tot vier keer en tot 10 keer versnelling van de prestaties, in vergelijking met de niet-geoptimaliseerde Delta Lake-bestanden. Spark in Fabric optimaliseert partities dynamisch tijdens het genereren van bestanden met een standaardgrootte van 128 MB. De grootte van het doelbestand kan worden gewijzigd per workload met behulp van configuraties.

    Met de functionaliteit voor het optimaliseren van schrijfbewerkingen vermindert de Apache Spark-engine het aantal geschreven bestanden en is gericht op het vergroten van de afzonderlijke bestandsgrootte van de geschreven gegevens.

  11. Voordat u gegevens schrijft als Delta Lake-tabellen in de sectie Tabellen van lakehouse, gebruikt u twee Fabric-functies (V-volgorde en Schrijf optimaliseren) voor geoptimaliseerde schrijfbewerking van gegevens en voor verbeterde leesprestaties. Als u deze functies in uw sessie wilt inschakelen, stelt u deze configuraties in in de eerste cel van uw notebook.

    Als u het notebook wilt starten en alle cellen op volgorde wilt uitvoeren, selecteert u Alles uitvoeren op het bovenste lint (onder Start). Als u alleen code uit een specifieke cel wilt uitvoeren, selecteert u het pictogram Uitvoeren dat links van de cel wordt weergegeven wanneer u de muisaanwijzer plaatst of drukt u op Shift+Enter op het toetsenbord terwijl het besturingselement zich in de cel bevindt.

    Schermopname van een spark-sessieconfiguratiescherm, inclusief een codecel en pictogram Uitvoeren.

    Bij het uitvoeren van een cel hoeft u de specifieke details van de onderliggende Spark-pool of cluster niet op te geven, omdat Fabric deze levert via de Live Pool. Elke Fabric-werkruimte wordt geleverd met een standaard Spark-pool, livepool genoemd. Dit betekent dat wanneer u notebooks maakt, u zich geen zorgen hoeft te maken over het opgeven van Spark-configuraties of clusterdetails. Wanneer u de eerste notebookopdracht uitvoert, is de livepool binnen enkele seconden actief. De Spark-sessie wordt tot stand gebracht en de code wordt uitgevoerd. Volgende code-uitvoering is bijna onmiddellijk in dit notebook terwijl de Spark-sessie actief is.

  12. Vervolgens leest u onbewerkte gegevens uit de sectie Bestanden van het lakehouse en voegt u meer kolommen toe voor verschillende datumonderdelen als onderdeel van de transformatie. Ten slotte gebruikt u de partitie van de Spark-API om de gegevens te partitioneren voordat u deze schrijft als Delta-tabelindeling op basis van de zojuist gemaakte kolommen voor gegevensonderdelen (Jaar en Kwartaal).

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. Nadat de feitentabellen zijn geladen, kunt u doorgaan met het laden van gegevens voor de rest van de dimensies. In de volgende cel wordt een functie gemaakt om onbewerkte gegevens te lezen uit de sectie Bestanden van het lakehouse voor elke tabelnaam die als parameter is doorgegeven. Vervolgens wordt er een lijst met dimensietabellen gemaakt. Ten slotte wordt de lijst met tabellen doorlopen en wordt er een Delta-tabel gemaakt voor elke tabelnaam die wordt gelezen uit de invoerparameter. Houd er rekening mee dat in het script de kolom met de naam Photo in dit voorbeeld wordt verwijderd omdat de kolom niet wordt gebruikt.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_customer',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. Als u de gemaakte tabellen wilt valideren, klikt u met de rechtermuisknop en selecteert u verversen in de wwilakehouse lakehouse. De tabellen worden weergegeven.

    Schermopname die laat zien waar u uw gemaakte tabellen kunt vinden in Lakehouse Explorer.

  15. Ga opnieuw naar de itemsweergave van de werkruimte en selecteer het wwilakehouse lakehouse om het te openen.

  16. Open nu het tweede notitieblok. In de weergave Lakehouse, selecteer Notitieblok openen>Bestaand notitieblok op het lint.

  17. Selecteer in de lijst met bestaande notitieblokken het 02 - Gegevenstransformatie - Zakelijke notitieblok om het te openen.

    Schermopname van het menu Bestaand notitieblok openen, waarin wordt weergegeven waar u uw notitieblok kunt selecteren.

  18. In het geopende notitieblok in Lakehouse Explorer ziet u dat het notitieblok al is gekoppeld aan uw geopende lakehouse.

  19. Een organisatie kan gegevenstechnici hebben die werken met Scala/Python en andere data engineers die werken met SQL (Spark SQL of T-SQL), die allemaal aan dezelfde kopie van de gegevens werken. Fabric maakt het mogelijk voor deze verschillende groepen, met gevarieerde ervaring en voorkeur, om te werken en samen te werken. De twee verschillende benaderingen transformeren en genereren bedrijfsaggregaties. U kunt de oplossing kiezen die geschikt is voor u of combineert deze benaderingen op basis van uw voorkeur zonder afbreuk te doen aan de prestaties:

    • Benadering #1 - Gebruik PySpark om gegevens samen te voegen en te aggregeren voor het genereren van zakelijke samenvattingen. Deze benadering verdient de voorkeur aan iemand met een programmeerachtergrond (Python of PySpark).

    • Benadering 2 - Gebruik Spark SQL om gegevens te verbinden en te aggregeren voor het genereren van bedrijfsaggregaties. Deze benadering verdient de voorkeur aan iemand met SQL-achtergrond, die overstapt naar Spark.

  20. Benadering #1 (sale_by_date_city): Gebruik PySpark om gegevens te combineren en te aggregeren voor het genereren van bedrijfsaggregaties. Met de volgende code maakt u drie verschillende Spark-gegevensframes, die elk verwijzen naar een bestaande Delta-tabel. Vervolgens koppelt u deze tabellen met behulp van de dataframes, voert u een groepsfunctie uit om aggregaties te genereren, wijzigt u de naam van een paar kolommen en schrijft u het ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse om de gegevens te behouden.

    In deze cel maakt u drie verschillende Spark-gegevensframes, die elk verwijzen naar een bestaande Delta-tabel.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    Voeg de volgende code toe aan dezelfde cel om deze tabellen samen te voegen met behulp van de eerder gemaakte dataframes. Groepeer op om aggregatie te genereren, wijzig de naam van een paar kolommen en schrijf deze ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. Benadering 2 (sale_by_date_employee): Gebruik Spark SQL om gegevens te koppelen en te aggregeren voor het genereren van zakelijke aggregaties. Met de volgende code maakt u een tijdelijke Spark-weergave door drie tabellen samen te voegen, groepeer op om aggregatie te genereren en wijzigt u enkele kolommen. Ten slotte leest u uit de tijdelijke Spark-weergave en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van lakehouse om deze te behouden met de gegevens.

    In deze cel maakt u een tijdelijke Spark-weergave door drie tabellen samen te voegen, groeperen op om aggregatie te genereren en een aantal kolommen een andere naam te geven.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    In deze cel leest u uit de tijdelijke Spark-weergave die in de vorige cel is gemaakt en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. Om de gemaakte tabellen te valideren, klik met de rechtermuisknop op het lakehouse wwilakehouse en selecteer Vernieuwen. De samengevoegde tabellen worden weergegeven.

    Schermopname van Lakehouse Explorer waarin wordt weergegeven waar de nieuwe tabellen worden weergegeven.

De twee benaderingen produceren een vergelijkbaar resultaat. Als u de noodzaak voor het leren van een nieuwe technologie of inbreuk op prestaties wilt minimaliseren, kiest u de methode die het beste bij uw achtergrond en voorkeur past.

Mogelijk merkt u dat u gegevens schrijft als Delta Lake-bestanden. De automatische functie voor tabeldetectie en -registratie van Fabric wordt opgehaald en geregistreerd in de metastore. U hoeft niet expliciet instructies aan te roepen CREATE TABLE om tabellen te maken die u met SQL wilt gebruiken.

Volgende stap