Lakehouse-oktatóanyag: Adatok előkészítése és átalakítása a lakehouse-ban

Ebben az oktatóanyagban jegyzetfüzeteket fog használni a Spark-futtatókörnyezettel az adatok átalakításához és előkészítéséhez.

Fontos

A Microsoft Fabric előzetes verzióban érhető el.

Előfeltételek

Adatok előkészítése

Az előző oktatóanyag lépéseiből nyers adatokat gyűjtöttünk be a forrásból a tótárház Fájlok szakaszára. Most átalakíthatja az adatokat, és előkészítheti azokat deltatáblák létrehozására.

  1. Töltse le a jegyzetfüzeteket a Lakehouse Tutorial Source Code mappából.

  2. A képernyő bal alsó sarkában található élményváltóban válassza az Adatfeldolgozás lehetőséget.

    Képernyőkép arról, hogy hol található a felhasználói élményváltó, és válassza a adattervezés lehetőséget.

  3. Válassza a Jegyzetfüzet importálása lehetőséget a kezdőlap tetején található Új szakaszból.

  4. Válassza a Feltöltés lehetőséget a képernyő jobb oldalán megnyíló Importálás állapota panelen.

  5. Jelölje ki a szakasz 1. lépésében letöltött összes jegyzetfüzetet.

  6. Válassza a Megnyitás lehetőséget. Az importálás állapotát jelző értesítés a böngészőablak jobb felső sarkában jelenik meg.

    Képernyőkép a letöltött jegyzetfüzetek helyéről és a Megnyitás gombról.

  7. Az importálás sikeressége után megnyithatja a munkaterület elemek nézetét, és megtekintheti az újonnan importált jegyzetfüzeteket. A megnyitáshoz válassza a wwilakehouse lakehouse lehetőséget.

    Képernyőkép az importált jegyzetfüzetek listájáról és arról, hogy hol válassza ki a tótárházat.

  8. A wwilakehouse lakehouse megnyitása után válassza a jegyzetfüzet> megnyitásaMeglévő jegyzetfüzet lehetőséget a felső navigációs menüben.

    Képernyőkép a sikeresen importált jegyzetfüzetek listájáról.

  9. A meglévő jegyzetfüzetek listájában válassza a 01 – Delta-táblák létrehozása jegyzetfüzetet, és válassza a Megnyitás lehetőséget.

  10. A Lakehouse Explorer megnyitott jegyzetfüzetében láthatja, hogy a jegyzetfüzet már hozzá van kapcsolva a megnyitott tótárházhoz.

    Megjegyzés

    A Fabric biztosítja az optimalizált Delta Lake-fájlok írásához szükséges V-order képességet. A V-order gyakran 3-4-szeresével és akár tízszeres teljesítménygyorsítással javítja a nem optimalizált Delta Lake-fájlok tömörítését. A Spark in Fabric dinamikusan optimalizálja a partíciókat, miközben alapértelmezett 128 MB méretű fájlokat hoz létre. A célfájl mérete a számítási feladatokra vonatkozó követelmények szerint konfigurációk használatával módosítható. Az írás optimalizálása révén az Apache Spark-motor csökkenti az írott fájlok számát, és célja az írott adatok egyedi fájlméretének növelése.

  11. Mielőtt delta lake-táblákként ír adatokat a tótárház Táblák szakaszában, két Fabric-funkciót (V-order és Optimize Write) használ az optimalizált adatíráshoz és a jobb olvasási teljesítményhez. Ha engedélyezni szeretné ezeket a funkciókat a munkamenetben, állítsa be ezeket a konfigurációkat a jegyzetfüzet első cellájában.

    A jegyzetfüzet elindításához és az összes cella egymás utáni végrehajtásához válassza az Összes futtatása lehetőséget a felső menüszalagon (a Kezdőlap alatt). Vagy ha csak egy adott cellából szeretne kódot végrehajtani, jelölje ki a cellától balra megjelenő Futtatás ikont rámutatáskor, vagy nyomja le a SHIFT + ENTER billentyűkombinációt a billentyűzeten, miközben a vezérlő a cellában van.

    Képernyőkép a Spark-munkamenet konfigurációs képernyőjéről, beleértve a kódcellát és a Futtatás ikont.

    Cella futtatásakor nem kellett megadnia az alapul szolgáló Spark-készlet vagy -fürt adatait, mert a Háló élő készleten keresztül biztosítja őket. Minden Háló-munkaterülethez tartozik egy alapértelmezett Spark-készlet, az úgynevezett Élő készlet. Ez azt jelenti, hogy jegyzetfüzetek létrehozásakor nem kell aggódnia a Spark-konfigurációk vagy a fürt részleteinek megadása miatt. Az első jegyzetfüzetparancs végrehajtásakor az élő készlet néhány másodpercen belül működik. Ekkor létrejön a Spark-munkamenet, és megkezdi a kód végrehajtását. A további kódvégrehajtás szinte azonnal megtörténik ebben a jegyzetfüzetben, amíg a Spark-munkamenet aktív.

  12. Ezután beolvassa a nyers adatokat a tótárház Fájlok szakaszából, és az átalakítás részeként további oszlopokat ad hozzá a különböző dátumrészekhez. Végül a partitionBy Spark API használatával particionálja az adatokat, mielőtt az újonnan létrehozott adatrészoszlopok (Year és Quarter) alapján deltatáblaként íratja azokat.

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. A ténytáblák betöltése után továbbléphet a többi dimenzió adatainak betöltésére. Az alábbi cella létrehoz egy függvényt, amely nyers adatokat olvas be a tótárház Files szakaszából minden paraméterként átadott táblanévhez. Ezután létrehozza a dimenziótáblák listáját. Végül végighalad a táblák listáján, és létrehoz egy delta táblát a bemeneti paraméterből beolvasott minden egyes táblanévhez.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. A létrehozott táblák ellenőrzéséhez kattintson a jobb gombbal, és válassza a frissítés lehetőséget a wwilakehouse tóházban . Megjelennek a táblák.

    Képernyőkép a létrehozott táblák megkereséséről a Lakehouse Explorerben.

  15. Nyissa meg újra a munkaterület elemek nézetét, és válassza ki a wwilakehouse tóházat a megnyitásához.

  16. Most nyissa meg a második jegyzetfüzetet. A tótárház nézetben válassza a jegyzetfüzet> megnyitásaMeglévő jegyzetfüzet lehetőséget a menüszalagon.

  17. A meglévő jegyzetfüzetek listájából válassza a 02 – Adatátalakítás – Üzleti jegyzetfüzetet a megnyitásához.

    Képernyőkép a Meglévő jegyzetfüzet megnyitása menüről, amelyen látható, hogy hol válassza ki a jegyzetfüzetet.

  18. A Lakehouse Explorer megnyitott jegyzetfüzetében láthatja, hogy a jegyzetfüzet már hozzá van kapcsolva a megnyitott tótárházhoz.

  19. Előfordulhat, hogy egy szervezet adatmérnökei a Scalával/Pythonnal és az SQL-sel (Spark SQL vagy T-SQL) dolgozó más adatmérnökökkel dolgoznak, és mindegyik ugyanazon az adatpéldányon dolgozik. A háló lehetővé teszi, hogy ezek a különböző csoportok különböző tapasztalatokkal és előnyben részesítve dolgoznak és működjenek együtt. A két különböző megközelítés átalakítja és létrehozza az üzleti összesítéseket. Kiválaszthatja az Önnek megfelelőt, vagy keverheti és megfeleltetheti ezeket a megközelítéseket a preferenciája alapján anélkül, hogy veszélyeztetné a teljesítményt:

    • 1. megközelítés – A PySpark használatával összekapcsolhatja és összesítheti az üzleti aggregátumok létrehozásához szükséges adatokat. Ez a megközelítés előnyösebb, ha valaki programozási (Python vagy PySpark) háttérrel rendelkezik.

    • 2. megközelítés – A Spark SQL használata az üzleti aggregátumok létrehozásához szükséges adatok összekapcsolására és összesítésére. Ez a megközelítés előnyösebb egy SQL-háttérrel rendelkező felhasználó számára, aki a Sparkra vált.

  20. 1. megközelítés (sale_by_date_city) – A PySpark használatával összekapcsolhatja és összesítheti az üzleti összesítések létrehozásához szükséges adatokat. Az alábbi kóddal három különböző Spark-adatkeretet hozhat létre, mindegyik egy meglévő Delta-táblára hivatkozik. Ezután az adatkeretek használatával összekapcsolja ezeket a táblákat, csoportosítást végez az összesítés létrehozásához, átnevez néhány oszlopot, és végül deltatáblaként írja a tótárház Táblák szakaszában az adatokkal való megőrzéshez.

    Ebben a cellában három különböző Spark-adatkeretet hoz létre, mindegyik egy meglévő deltatáblára hivatkozik.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    Ebben a cellában összekapcsolja ezeket a táblákat a korábban létrehozott adatkeretek használatával, csoportosítást végez az összesítés létrehozásához, átnevez néhány oszlopot, és végül deltatáblaként írja be a tótárház Táblák szakaszába.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. 2. megközelítés (sale_by_date_employee) – A Spark SQL használatával összekapcsolhatja és összesítheti az üzleti összesítések létrehozásához szükséges adatokat. Az alábbi kóddal egy ideiglenes Spark-nézetet hozhat létre három tábla összekapcsolásával, csoportosítással összesítést hozhat létre, és átnevezhet néhány oszlopot. Végül felolvassa az ideiglenes Spark-nézetet, és végül deltatáblaként írja azt a tótárház Táblák szakaszában az adatok megőrzése érdekében.

    Ebben a cellában létrehoz egy ideiglenes Spark-nézetet három tábla összekapcsolásával, csoportosítással összesítést hoz létre, és átnevez néhány oszlopot.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    Ebben a cellában az előző cellában létrehozott ideiglenes Spark-nézetből olvassa be, és végül deltatáblaként írja azt a tótárház Táblák szakaszába.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. A létrehozott táblák ellenőrzéséhez kattintson a jobb gombbal, és válassza a frissítés lehetőséget a wwilakehouse tóházban . Megjelennek az összesítő táblák.

    Képernyőkép a Lakehouse Explorerről, amelyen az új táblák láthatók.

Mindkét megközelítés hasonló eredményt ad. A háttér és a beállítások alapján választhat, hogy minimálisra csökkentse az új technológiák megismerésének vagy a teljesítmény sérülésének szükségességét.

Azt is észreveheti, hogy delta lake-fájlként ír adatokat. A Háló automatikus táblafelderítési és -regisztrációs funkciója felveszi és regisztrálja őket a metaadattárban. Az SQL-hez használható táblák létrehozásához nem kell explicit módon meghívnia CREATE TABLE az utasításokat.

Következő lépések

Folytassa a következő cikkel, amelyből többet tudhat meg