Share via


Lakehouse-zelfstudie: Gegevens voorbereiden en transformeren in lakehouse

In deze zelfstudie gebruikt u notebooks met Spark Runtime om onbewerkte gegevens in uw lakehouse te transformeren en voorbereiden.

Vereisten

Als u geen lakehouse hebt dat gegevens bevat, moet u het volgende doen:

Gegevens voorbereiden

Uit de vorige zelfstudiestappen zijn onbewerkte gegevens opgenomen uit de bron naar de sectie Bestanden van het lakehouse. U kunt deze gegevens nu transformeren en voorbereiden voor het maken van Delta-tabellen.

  1. Download de notebooks uit de map Broncode van de Lakehouse-zelfstudie.

  2. Selecteer Data-engineer ing in de schakeloptie linksonder in het scherm.

    Schermopname die laat zien waar u de switcher kunt vinden en Data-engineer ing selecteert.

  3. Selecteer Notitieblok importeren in de sectie Nieuw boven aan de landingspagina.

  4. Selecteer Uploaden in het deelvenster Status importeren dat aan de rechterkant van het scherm wordt geopend.

  5. Selecteer alle notitieblokken die u in de eerste stap van deze sectie hebt gedownload.

    Schermopname die laat zien waar u de gedownloade notitieblokken en de knop Openen kunt vinden.

  6. Selecteer Openen. Er wordt een melding weergegeven die de status van de import aangeeft in de rechterbovenhoek van het browservenster.

  7. Nadat het importeren is voltooid, gaat u naar de itemsweergave van de werkruimte en ziet u de zojuist geïmporteerde notitieblokken. Selecteer wwilakehouse lakehouse om het te openen.

    Schermopname van de lijst met geïmporteerde notitieblokken en waar het lakehouse moet worden geselecteerd.

  8. Zodra het wwilakehouse Lakehouse is geopend, selecteert u Bestaand notitieblok> openen in het bovenste navigatiemenu.

    Schermopname van de lijst met geïmporteerde notitieblokken.

  9. Selecteer in de lijst met bestaande notitieblokken het notitieblok 01 - Delta Tables maken en open.

  10. In het geopende notitieblok in Lakehouse Explorer ziet u dat het notitieblok al is gekoppeld aan uw geopende lakehouse.

    Notitie

    Fabric biedt de V-ordermogelijkheid voor het schrijven van geoptimaliseerde Delta Lake-bestanden. V-volgorde verbetert vaak compressie met drie tot vier keer, en tot 10 keer, prestatieversnelling over de Delta Lake-bestanden die niet zijn geoptimaliseerd. Spark in Fabric optimaliseert partities dynamisch tijdens het genereren van bestanden met een standaardgrootte van 128 MB. De grootte van het doelbestand kan worden gewijzigd per workload met behulp van configuraties.

    Met de functionaliteit voor het optimaliseren van schrijfbewerkingen vermindert de Apache Spark-engine het aantal geschreven bestanden en is gericht op het vergroten van de afzonderlijke bestandsgrootte van de geschreven gegevens.

  11. Voordat u gegevens schrijft als Delta Lake-tabellen in de sectie Tabellen van lakehouse, gebruikt u twee Fabric-functies (V-volgorde en Schrijf optimaliseren) voor geoptimaliseerde schrijfbewerking van gegevens en voor verbeterde leesprestaties. Als u deze functies in uw sessie wilt inschakelen, stelt u deze configuraties in in de eerste cel van uw notebook.

    Als u het notebook wilt starten en alle cellen op volgorde wilt uitvoeren, selecteert u Alles uitvoeren op het bovenste lint (onder Start). Als u alleen code uit een specifieke cel wilt uitvoeren, selecteert u het pictogram Uitvoeren dat links van de cel wordt weergegeven wanneer u de muisaanwijzer plaatst of drukt u op Shift+Enter op het toetsenbord terwijl het besturingselement zich in de cel bevindt.

    Schermopname van een spark-sessieconfiguratiescherm, inclusief een codecel en pictogram Uitvoeren.

    Bij het uitvoeren van een cel hoeft u de onderliggende Spark-pool of clusterdetails niet op te geven, omdat Fabric deze via livepool levert. Elke Fabric-werkruimte wordt geleverd met een standaard Spark-pool, livepool genoemd. Dit betekent dat wanneer u notebooks maakt, u zich geen zorgen hoeft te maken over het opgeven van Spark-configuraties of clusterdetails. Wanneer u de eerste notebookopdracht uitvoert, is de livepool binnen enkele seconden actief en actief. De Spark-sessie wordt tot stand gebracht en de code wordt uitgevoerd. Volgende code-uitvoering is bijna onmiddellijk in dit notebook terwijl de Spark-sessie actief is.

  12. Vervolgens leest u onbewerkte gegevens uit de sectie Bestanden van het lakehouse en voegt u meer kolommen toe voor verschillende datumonderdelen als onderdeel van de transformatie. Ten slotte gebruikt u de partitie van de Spark-API om de gegevens te partitioneren voordat u deze schrijft als Delta-tabelindeling op basis van de zojuist gemaakte kolommen voor gegevensonderdelen (Jaar en Kwartaal).

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. Nadat de feitentabellen zijn geladen, kunt u doorgaan met het laden van gegevens voor de rest van de dimensies. In de volgende cel wordt een functie gemaakt om onbewerkte gegevens te lezen uit de sectie Bestanden van het lakehouse voor elke tabelnaam die als parameter is doorgegeven. Vervolgens wordt er een lijst met dimensietabellen gemaakt. Ten slotte wordt de lijst met tabellen doorlopen en wordt er een Delta-tabel gemaakt voor elke tabelnaam die wordt gelezen uit de invoerparameter. Houd er rekening mee dat in het script de kolom met de naam Photo in dit voorbeeld wordt verwijderd omdat de kolom niet wordt gebruikt.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. Als u de gemaakte tabellen wilt valideren, klikt u met de rechtermuisknop en selecteert u vernieuwen in het lakehouse wwilakehouse . De tabellen worden weergegeven.

    Schermopname die laat zien waar u uw gemaakte tabellen kunt vinden in Lakehouse Explorer.

  15. Ga opnieuw naar de itemsweergave van de werkruimte en selecteer het lakehouse wwilakehouse om deze te openen.

  16. Open nu het tweede notitieblok. Selecteer In de weergave Lakehouse het bestaande notitieblok> openen op het lint.

  17. Selecteer in de lijst met bestaande notitieblokken het 02 - Gegevenstransformatie - Zakelijke notitieblok om het te openen.

    Schermopname van het menu Bestaand notitieblok openen, waarin wordt weergegeven waar u uw notitieblok kunt selecteren.

  18. In het geopende notitieblok in Lakehouse Explorer ziet u dat het notitieblok al is gekoppeld aan uw geopende lakehouse.

  19. Een organisatie kan gegevenstechnici hebben die werken met Scala/Python en andere data engineers die werken met SQL (Spark SQL of T-SQL), die allemaal aan dezelfde kopie van de gegevens werken. Fabric maakt het mogelijk voor deze verschillende groepen, met gevarieerde ervaring en voorkeur, om te werken en samen te werken. De twee verschillende benaderingen transformeren en genereren bedrijfsaggregaties. U kunt de oplossing kiezen die geschikt is voor u of combineert deze benaderingen op basis van uw voorkeur zonder afbreuk te doen aan de prestaties:

    • Benadering #1 : Gebruik PySpark om gegevens samen te voegen en samen te voegen voor het genereren van bedrijfsaggregaties. Deze benadering verdient de voorkeur aan iemand met een programmeerachtergrond (Python of PySpark).

    • Benadering 2 : Gebruik Spark SQL om gegevens samen te voegen en samen te voegen voor het genereren van bedrijfsaggregaties. Deze benadering verdient de voorkeur aan iemand met SQL-achtergrond, die overstapt naar Spark.

  20. Benadering #1 (sale_by_date_city): gebruik PySpark om gegevens samen te voegen en samen te voegen voor het genereren van bedrijfsaggregaties. Met de volgende code maakt u drie verschillende Spark-gegevensframes, die elk verwijzen naar een bestaande Delta-tabel. Vervolgens koppelt u deze tabellen met behulp van de dataframes, groepeert u op om aggregatie te genereren, wijzigt u de naam van een paar kolommen en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse om deze samen te voegen met de gegevens.

    In deze cel maakt u drie verschillende Spark-gegevensframes, die elk verwijzen naar een bestaande Delta-tabel.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    In deze cel koppelt u deze tabellen aan de hand van de eerder gemaakte dataframes, groepeert u op om aggregatie te genereren, wijzigt u de naam van een paar kolommen en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. Benadering 2 (sale_by_date_employee): Gebruik Spark SQL om gegevens samen te voegen en samen te voegen voor het genereren van bedrijfsaggregaties. Met de volgende code maakt u een tijdelijke Spark-weergave door drie tabellen samen te voegen, groepeer op om aggregatie te genereren en wijzigt u enkele kolommen. Ten slotte leest u uit de tijdelijke Spark-weergave en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van lakehouse om deze te behouden met de gegevens.

    In deze cel maakt u een tijdelijke Spark-weergave door drie tabellen samen te voegen, groeperen op om aggregatie te genereren en een aantal kolommen een andere naam te geven.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    In deze cel leest u uit de tijdelijke Spark-weergave die in de vorige cel is gemaakt en schrijft u deze ten slotte als een Delta-tabel in de sectie Tabellen van het lakehouse.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. Als u de gemaakte tabellen wilt valideren, klikt u met de rechtermuisknop en selecteert u Vernieuwen in het lakehouse wwilakehouse. De samengevoegde tabellen worden weergegeven.

    Schermopname van Lakehouse Explorer waarin wordt weergegeven waar de nieuwe tabellen worden weergegeven.

De twee benaderingen produceren een vergelijkbaar resultaat. Als u de noodzaak voor het leren van een nieuwe technologie of inbreuk op prestaties wilt minimaliseren, kiest u de methode die het beste bij uw achtergrond en voorkeur past.

Mogelijk merkt u dat u gegevens schrijft als Delta Lake-bestanden. De automatische functie voor tabeldetectie en -registratie van Fabric wordt opgehaald en geregistreerd in de metastore. U hoeft niet expliciet instructies aan te roepen CREATE TABLE om tabellen te maken die u met SQL wilt gebruiken.

Volgende stap