練習 - 將筆記本整合到 Azure Synapse 管線內

已完成

在此單元中,您將建立 Azure Synapse Spark 筆記本,以分析和轉換由對應資料流載入的資料,並將資料儲存在資料湖中。 您將建立參數資料格來接受字串參數,以針對筆記本寫入資料湖的資料,定義資料夾名稱。

然後,您會將此筆記本新增至 Synapse 管線,並將唯一的管線執行識別碼傳遞至筆記本參數,讓您稍後可以將管線執行與筆記本活動所儲存的資料相互關聯。

最後,您會使用 Synapse Studio 中的 [監視]中樞來監視管線執行、取得執行識別碼,然後找出儲存在資料湖中的對應檔案。

關於 Apache Spark 和筆記本

Apache Spark 是一個平行處理架構,可支援記憶體內部處理,以大幅提升巨量資料分析應用程式的效能。 Azure Synapse Analytics 中的 Apache Spark 是 Microsoft 在雲端中的其中一種 Apache Spark 實作。

Synapse Studio 中的 Apache Spark 筆記本是 Web 介面,可讓您建立包含即時程式碼、視覺效果和敘述文的檔案。 筆記本是驗證想法和使用快速實驗從您的資料取得見解的絕佳位置。 筆記本也廣泛用於資料準備、資料視覺效果、機器學習和其他巨量資料案例中。

建立 Synapse Spark 筆記本

假設您在 Synapse Analytics 中建立對應資料流,以處理、聯結和匯入使用者設定檔資料。 現在,您想根據哪些為喜愛和首選,且過去 12 個月內購買最多的產品,找出每個使用者的前 5 名產品。 然後,您想要計算全體產品的前 5 名。

在此練習中,您建立 Synapse Spark 筆記本來進行這些計算。

  1. 開啟 Synapse Analytics Studio (https://web.azuresynapse.net/),然後移至 [資料]中樞。

    The Data menu item is highlighted.

  2. 選取 [已連結]索引標籤 (1),然後展開 [Azure Data Lake Storage Gen2] 下方的主要資料湖儲存體帳戶 (2)。 選取 [wwi-02]容器 (3),然後開啟 [top-products]資料夾 (4)。 以滑鼠右鍵按一下任何 Parquet 檔案 (5),選取 [新增筆記本]功能表項目 (6),然後選取 [載入至 DataFrame] (7)。 如果您沒看到該資料夾,請選取 Refresh

    The Parquet file and new notebook option are highlighted.

  3. 請確定筆記本已附加至 Spark 集區。

    The attach to Spark pool menu item is highlighted.

  4. 將 Parquet 檔案名稱換成 *.parquet(1),以選取 top-products 資料夾中的所有 Parquet 檔案。 例如,路徑應該類似於:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet

    The filename is highlighted.

  5. 在筆記本工具列上,選取 [全部執行]以執行筆記本。

    The cell results are displayed.

    注意

    當您第一次在 Spark 集區中執行筆記本時,Synapse 會建立新的工作階段。 這可能需要大約 3-5 分鐘的時間。

    注意

    若只要執行資料格,請將滑鼠游標暫留在資料格上,並選取資料格左側的 [執行資料格]圖示,或是選取資料格,然後在鍵盤上輸入 Ctrl+Enter

  6. 選取 + 按鈕,然後選取 [程式碼資料格]項目,在下方建立新的資料格。 [+] 按鈕位於左側筆記本資料格下方。 或者,您也可以展開筆記本工具列中的 [+ 資料格]功能表,然後選取 [程式碼資料格]項目。

    The Add Code menu option is highlighted.

  7. 在新的資料格中執行下列命令,以填入新的 dataframe (名為 topPurchases)、建立新的暫時檢視 (名為 top_purchases),以及顯示前 100 個資料列:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    輸出應如下所示:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. 在新的資料格中,使用 SQL 執行下列命令,以建立新的暫時檢視:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    注意

    此查詢沒有輸出。

    此查詢使用 top_purchases 暫時檢視作為來源,並套用 row_number() over 方法,以針對每位使用者找出 ItemsPurchasedLast12Months 最大的記錄,然後請求資料列編號。 where 子句會篩選結果,因此,我們最多只取出 IsTopProductIsPreferredProduct 都設為 true 的五個產品。 這讓我們知道每位使用者購買最多的前五名產品,而根據儲存在 Azure Cosmos DB 中的使用者設定檔,這些產品「也」識別為喜愛的產品。

  9. 在新的資料格中執行下列命令,來建立並顯示新的 DataFrame,以儲存您在上一個資料格中建立的 top_5_products 暫時檢視的結果:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    您應該會看到類似下面的輸出,其中顯示每位使用者喜愛的前五名產品:

    The top five preferred products are displayed per user.

  10. 根據客戶喜愛且購買最多的產品,計算全體產品的前五名。 若要這麼做,請在新資料格中執行下列命令:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    在此資料格中,我們依產品識別碼將前五名喜愛的產品分組,加總過去 12 個月內購買的項目總數,以遞減順序排序該值,然後傳回前五名結果。 您的輸出應如下所示:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

建立參數資料格

Azure Synapse 管線會尋找參數資料格,並將此資料格視為執行階段傳入之參數的預設值。 執行引擎會在參數資料格下方新增資料格,且帶有輸入參數來覆寫預設值。 如果未指定參數資料格,則插入的資料格會插入到筆記本的頂端。

  1. 我們將從管線執行此筆記本。 我們想要傳入參數來設定 runId 變數值,以用來命名 Parquet 檔案。 在新資料格中執行下列命令:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    我們使用 uuid Spark 隨附的程式庫來產生隨機的 GUID。 我們想要以管線傳入的參數來覆寫 runId 變數。 若要這樣做,我們必須將此資料格切換為參數資料格。

  2. 選取資料格右上角的動作省略符號 (...)(1),然後選取 [切換參數資料格](2)

    The menu item is highlighted.

    切換此選項之後,您在資料格上會看到 [參數]標記。

    The cell is configured to accept parameters.

  3. 將下列程式碼貼在新的資料格中,以便在主要資料湖帳戶的 /top5-products/ 路徑中,使用 runId 變數當作 Parquet 檔案名稱。 在路徑中,以主要資料湖帳戶的名稱取代 YOUR_DATALAKE_NAME。 若要找出此變數,請向上捲動至頁面頂端的 [資料格 1](1)。 從路徑複製資料湖儲存體帳戶 (2)。 在新的資料格內,貼上此值取代路徑中的 YOUR_DATALAKE_NAME(3),然執行資料格中的命令。

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    The path is updated with the name of the primary data lake account.

  4. 確認檔案已寫入資料湖。 移至 [資料]中樞,然後選取 [已連結]索引標籤 (1)。 展開主要資料湖儲存體帳戶,然後選取 [wwi-02] 容器 (2)。 移至 [top5-products]資料夾 (3)。 您應該會在目錄中看到 Parquet 檔案的資料夾,檔案名稱是 GUID (4)

    The parquet file is highlighted.

    因為先前不存在此目錄,所以在筆記本資料格中,dataframe 上的 Parquet 寫入方法就建立了此目錄。

將筆記本新增至 Synapse 管線

回到我們在練習開頭所述的對應資料流,假設在協調流程中執行資料流程之後,您想要執行此筆記本。 若要這樣做,請將此筆記本新增至管線,成為新的筆記本活動。

  1. 返回筆記本。 選取筆記本右上角的 [屬性](1),然後在 [名稱] (2) 中輸入 Calculate Top 5 Products

    The properties blade is displayed.

  2. 選取筆記本右上角的 [新增至管線](1),然後選取 [現有管線] (2)

    The add to pipeline button is highlighted.

  3. 選取 [將使用者設定檔資料寫入 ASA]管線 (1),然後選取 [新增] *(2)

    The pipeline is selected.

  4. Synapse Studio 會將筆記本活動新增至管線。 將 [筆記本] 活動重新排列在 [資料流程] 活動的右邊。 選取 [資料流程] 活動,然後將 [成功]活動管線連線綠色方塊,拖曳至 [筆記本] 活動

    The green arrow is highlighted.

    [成功] 活動箭號指示管線在資料流程活動成功執行之後,執行筆記本活動。

  5. 選取 [筆記本] 活動 (1),然後選取 [設定]索引標籤 (2)、展開 [基底參數](3),再選取 [+ 新增] (4)。 在 [名稱] 欄位 (5) 中輸入 runId。 在 [類型](6) 中選取 [字串]。 在 [值]中,選取 [新增動態內容] (7)

    The settings are displayed.

  6. 在 [系統變數] (1) 下方,選取 [管線執行識別碼]。 這會將 @pipeline().RunId 新增至動態內容方塊 (2)。 選取 [完成] (3) 以關閉對話方塊。

    The dynamic content form is displayed.

    管線執行識別碼值是指派給每個管線執行的唯一 GUID。 我們會將此值當成 runId 筆記本參數傳入,以作為 Parquet 檔案的名稱。 接著,我們可以翻閱管線執行歷程記錄,找出每次管線執行所建立的特定 Parquet 檔案。

  7. 選取 [全部發佈],然後按一下 [發佈] 以儲存變更。

    Publish all is highlighted.

  8. 發佈完成後,選取 [新增觸發程序] (1),然後選取 [立即觸發](2),以執行更新的管線。

    The trigger menu item is highlighted.

  9. 選取 [確定]以執行觸發程序。

    The OK button is highlighted.

監視管道執行

[監視] 中樞可讓您監視 SQL、Apache Spark 和 Pipelines 的目前和歷史活動。

  1. 前往 [監視] 中樞。

    The Monitor hub menu item is selected.

  2. 選取 [管線執行] (1),然後等候管線執行成功完成 (2)。 您可能需要重新整理 (3) 檢視。

    The pipeline run succeeded.

  3. 選取管線的名稱,以檢視管線的活動執行。

    The pipeline name is selected.

  4. 請注意 [資料流程]活動和新的 [筆記本]活動 (1)。 記下 [管線執行識別碼](2)。 我們會比較此值與筆記本所產生的 Parquet 檔案名稱。 選取 [計算前 5 名產品]筆記本名稱,以檢視其詳細資料 (3)

    The pipeline run details are displayed.

  5. 在這裡,我們看到筆記本執行詳細資料。 您可以選取 [播放](1),以觀看作業 (2) 進度的播放。 在底部,您可以使用不同的篩選選項 (3),以檢視 [診斷]和 [記錄]。 在右側,我們可以檢視執行詳細資料,例如持續時間、Livy 識別碼、Spark 集區詳細資料等等。 選取作業上的 [檢視詳細資料] 連結,以檢視其詳細資料 (5)

    The run details are displayed.

  6. Spark 應用程式 UI 會在新的索引標籤中開啟,讓我們查看階段詳細資料。 展開 [DAG 視覺效果],以檢視階段詳細資料。

    The Spark stage details are displayed.

  7. 返回 [資料] 中樞。

    Data hub.

  8. 選取 [已連結]索引標籤 (1),然後選取主要資料湖儲存體帳戶上的 [wwi-02]容器 (2)、移至 [top5-products]資料夾 (3),然後針對名稱符合管線執行識別碼的 Parquet 檔案,確認有資料夾存在。

    The file is highlighted.

    如您所見,有一個檔案的名稱符合我們稍早記下的管線執行識別碼

    The Pipeline run ID is highlighted.

    因為我們已將管線執行識別碼傳給筆記本活動的 runId 參數,所以這些值相符。