分享方式:


Lakehouse 教學課程:準備和轉換 Lakehouse 中的資料

在本教學課程中,您會搭配 Spark 執行階段 使用筆記本,在 Lakehouse 中轉換和準備原始資料。

必要條件

如果您沒有包含資料的 Lakehouse,您必須:

準備資料

從先前的教學課程步驟中,我們已從來源擷取原始資料到 Lakehouse 的檔案區段。 現在您可以轉換該資料,並準備建立 Delta 資料表。

  1. Lakehouse 教學課程原始碼 資料夾下載筆記本。

  2. 從位於畫面左下方的 Power BI 切換器中,選取資料工程

    此螢幕擷取畫面顯示尋找切換器的位置,並選取資料工程師。

  3. 從登陸頁面頂端的新增 區段選取匯入筆記本

  4. 從畫面右側開啟的匯入狀態 窗格中選取上傳

  5. 選取您在本節第一個步驟中下載的所有筆記本。

    此螢幕擷取畫面顯示何處可尋找下載的筆記本和開啟 按鈕。

  6. 選取開啟。 指出匯入狀態的通知會出現在瀏覽器視窗右上角。

  7. 匯入成功之後,請移至工作區的項目檢視,並查看新匯入的筆記本。 選取 wwilakehouse lakehouse 以開啟它。

    此螢幕擷取畫面顯示匯入的筆記本清單,以及要選取 Lakehouse 的位置。

  8. 開啟 wwilakehouse Lakehouse 之後,請從頂端導覽功能表中選取開啟筆記本現有筆記本>

    顯示已成功匯入筆記本清單的螢幕擷取畫面。

  9. 從現有筆記本清單中,選取01 - 建立差異資料表 筆記本,然後選取 開啟

  10. 在 Lakehouse Explorer 的開啟筆記本中,您會看到筆記本已連結至已開啟的 Lakehouse。

    注意

    網狀架構提供 V 順序 功能來撰寫最佳化的 Delta Lake 檔案。 V 順序通常會在未最佳化之 Delta Lake 檔案上改善 3 到 4 倍的壓縮,以及最多 10 倍的效能加速。 Fabric 中的 Spark 會動態最佳化分割區,同時產生預設為 128 MB 大小的檔案。 目標檔案大小可以使用組態來變更每個工作負載需求。

    使用最佳化寫入功能,Apache Spark 引擎會減少寫入的檔案數目,並旨在增加寫入資料的個別檔案大小。

  11. 在 Lakehouse 的資料表 區段中將資料寫入為 Delta Lake Tables 之前,您會使用兩個網狀架構功能(V 順序和最佳化寫入)來將資料寫入最佳化,並改善讀取效能。 若要在您的工作階段中啟用這些功能,請在筆記本的第一個儲存格中組態這些設定。

    若要啟動筆記本並依序執行所有儲存格,請選取頂端功能區上的全部執行 (首頁下方)。 或者,若要只從特定儲存格執行程式碼,請選取滑鼠暫留時出現在儲存格左邊的執行圖示,或在控制項位於儲存格中時按鍵盤上的 SHIFT + ENTER

    Spark 工作階段設定畫面的螢幕擷取畫面,包括程式碼儲存格和執行圖示。

    執行儲存格時,您不需要指定基礎 Spark 集區或叢集詳細資料,因為 Fabric 會透過即時集區提供它們。 每個網狀架構工作區都隨附預設 Spark 集區,稱為「即時集區」。 這表示當您建立筆記本時,不需要擔心指定任何 Spark 組態或叢集詳細資料。 當您執行第一個筆記本命令時,即時集區會在幾秒鐘內啟動並執行。 而且會建立 Spark 工作階段,並開始執行程式碼。 當 Spark 工作階段處於使用中狀態時,此筆記本中的後續程式碼執行幾乎瞬間完成。

  12. 接下來,您會從 Lakehouse 的檔案區段讀取原始資料,並在轉換中新增更多不同日期部分的資料行。 最後,您可以使用資料分割 By Spark API 將資料分割,然後再根據新建立的資料部分資料行 (YearQuarter) 將它寫入為 Delta 資料表格式。

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. 在事實資料表載入之後,您可以繼續載入其餘維度的資料。 下列儲存格會建立函式,從 Lakehouse 的檔案區段讀取作為參數傳遞之每個表格名稱的未經處理資料。 接下來,它會建立維度資料表的清單。 最後,它會迴圈查看資料表清單,併為從輸入參數讀取的每個資料表名稱建立 Delta 資料表。 請注意,腳本會卸除此範例中名為 Photo 的資料行,因為未使用資料行。

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_customer'
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. 若要驗證已建立的資料表,請按滑鼠右鍵並選取 wwilakehouse Lakehouse 上的重新整理。 資料表隨即出現。

    此螢幕擷取畫面顯示在 Lakehouse 總管中尋找已建立資料表的位置。

  15. 再次移至工作區的項目檢視,然後選取 wwilakehouse lakehouse 加以開啟。

  16. 現在,開啟第二個筆記本。 在 Lakehouse 檢視中,從功能區選取開啟筆記本現有的筆記本>

  17. 從現有筆記本清單中,選取 02 - 資料轉換 - 商務 筆記本加以開啟。

    開啟現有筆記本功能表的螢幕擷取畫面,其中顯示要選取筆記本的位置。

  18. 在 Lakehouse Explorer 的開啟筆記本中,您會看到筆記本已連結至已開啟的 Lakehouse。

  19. 組織可能會有資料工程師使用 Scala/Python,以及其他使用 SQL 的資料工程師(Spark SQL 或 T-SQL),都處理相同的資料複本。 網狀架構可讓這些不同的群組使用不同的體驗和喜好設定,以工作和共同作業。 這兩種不同的方法會轉換併產生商務彙總。 您可以挑選適合您的方法,或根據您的喜好設定來混合和比對這些方法,而不會影響效能:

    • 方法 #1 - 使用 PySpark 聯結和彙總資料,以產生商務彙總。 這個方法最好是具有程序設計背景(Python 或 PySpark) 背景的人員。

    • 方法 #2 - 使用 Spark SQL 聯結和彙總資料,以產生商務彙總。 此方法最好是具有 SQL 背景、轉換至 Spark 的人員。

  20. 方法 #1 (sale_by_date_city) - 使用 PySpark 聯結和彙總資料來產生商務彙總。 使用下列程序代碼,您會建立三個不同的 Spark 資料框架,每個架構都會參考現有的 Delta 資料表。 然後,您可以使用資料框架來聯結這些資料表、執行分組來產生彙總、重新命名幾個資料行,最後將其寫入 Lakehouse 的資料表區段中的 Delta 資料表,以保存資料。

    在此儲存格中,您會建立三個不同的 Spark 資料框架,每個框架都會參考現有的 Delta 資料表。

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    將下列程式碼新增至相同的儲存格,以使用稍早建立的資料框架來連結這些資料表。 分組依據以產生彙總、重新命名幾個資料行,最後將其寫入 Lakehouse 的資料表區段中做為 Delta 資料表。

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. 方法 #2 (sale_by_date_employee) - 使用 Spark SQL 聯結和彙總資料,以產生商務彙總。 使用下列程式碼,您可以聯結三個資料表來建立暫存 Spark 檢視、執行分組以產生彙總,以及重新命名其中一些資料行。 最後,您會從暫存的 Spark 檢視讀取,最後將其寫入 Lakehouse 之 資料表區段中的 Delta 資料表,以保存資料。

    在此儲存格中,您會連結三個資料表來建立暫存 Spark 檢視、執行分組以產生彙總,以及重新命名幾個資料行。

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    在此儲存格中,您會從上一個儲存格中建立的暫存 Spark 檢視讀取,最後將其寫入 Lakehouse 的資料表區段中做為 Delta 資料表。

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. 若要驗證已建立的資料表,請按滑鼠右鍵並選取 wwilakehouse Lakehouse 上的重新整理。 彙總資料表隨即出現。

    Lakehouse 總管的螢幕擷取畫面,其中顯示新資料表的顯示位置。

這兩種方法會產生類似的結果。 若要將學習新技術或效能危害的需求降到最低,請選擇最符合背景和喜好設定的方法。

您可能會注意到您正在將資料寫入為 Delta Lake 檔案。 Fabric 的自動資料表探索和註冊功能會在中繼存放區中挑選並加以註冊。 您不需要明確呼叫 CREATE TABLE 語句,即可建立要與 SQL 搭配使用的資料表。

後續步驟