在 Azure Data Factory 中使用 Spark 活動轉換雲端中的數據

適用於: Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的單一分析解決方案。 Microsoft Fabric 涵蓋從數據移動到數據科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版

在本教學課程中,您會使用 Azure 入口網站 來建立 Azure Data Factory 管線。 此管線會使用 Spark 活動和隨選 Azure HDInsight 連結服務來轉換數據。

您會在本教學課程中執行下列步驟:

  • 建立資料處理站。
  • 建立使用Spark活動的管線。
  • 觸發管線執行。
  • 監視管線執行。

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

注意

建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 請參閱安裝 Azure PowerShell 以開始使用。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az

  • Azure 記憶體帳戶。 您可以建立 Python 腳本和輸入檔案,並將其上傳至 Azure 儲存體。 Spark 程式的輸出會儲存在此記憶體帳戶中。 隨選 Spark 叢集會使用與其主要記憶體相同的記憶體帳戶。

注意

HdInsight 僅支援具有標準層的一般用途記憶體帳戶。 請確定帳戶不是進階或僅限 Blob 的記憶體帳戶。

  • Azure PowerShell。 遵循如何安裝和設定 Azure PowerShell 中的指示。

將 Python 腳本上傳至您的 Blob 記憶體帳戶

  1. 使用下列內容建立名為 WordCount_Spark.py 的 Python 檔案:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. 以您的 Azure 記憶體帳戶名稱取代 storageAccountName>。< 然後儲存檔案。

  3. 在 Azure Blob 記憶體中,如果容器不存在,請建立名為 adftutorial 的容器。

  4. 建立名為 spark的資料夾。

  5. 在spark資料夾下建立名為 script子資料夾。

  6. WordCount_Spark.py 檔案上傳至 腳本 子資料夾。

上傳輸入檔案

  1. 使用一些文字建立名為 minecraftstory.txt 的檔案。 Spark 程式會計算此文字中的字數。
  2. 在spark資料夾中建立名為 inputfiles子資料夾。
  3. minecraftstory.txt 檔案上傳至 inputfiles 子資料夾。

建立資料處理站

請依照快速入門:使用 Azure 入口網站 建立數據處理站一文中的步驟,如果您還沒有使用數據處理站,請建立數據處理站。

建立連結服務

您會在本節中撰寫兩個連結服務:

  • Azure 儲存體 連結服務,可將 Azure 記憶體帳戶連結至數據處理站。 隨選 HDInsight 叢集會使用此記憶體。 它也包含要執行的Spark腳本。
  • 隨選 HDInsight 鏈接服務。 Azure Data Factory 會自動建立 HDInsight 叢集並執行 Spark 程式。 接著,它會在叢集閑置之後刪除 HDInsight 叢集,以預先設定的時間。

建立 Azure 儲存體鏈接服務

  1. 在首頁上,切換至左面板中的 [ 管理] 索引標籤。

    Screenshot that shows the Manage tab.

  2. 選取視窗底部的 連線,然後選取 [+ 新增]。

    Buttons for creating a new connection

  3. 在 [新增鏈接服務] 視窗中,選取 [數據存放區> Azure Blob 儲存體],然後選取 [繼續]。

    Selecting the "Azure Blob Storage" tile

  4. 針對 [儲存體 帳戶名稱],從清單中選取名稱,然後選取 [儲存]。

    Box for specifying the storage account name

建立隨選 HDInsight 鏈接服務

  1. 再次選取 [ + 新增 ] 按鈕,以建立另一個鏈接服務。

  2. 在 [新增鏈接服務] 視窗中,選取 [計算>Azure HDInsight],然後選取 [繼續]。

    Selecting the "Azure HDInsight" tile

  3. 在 [ 新增鏈接服務 ] 視窗中,完成下列步驟:

    a. 針對 [ 名稱],輸入 AzureHDInsightLinkedService

    b. 針對 [類型],確認已 選取 [隨選 HDInsight ]。

    c. 針對 [Azure 儲存體 鏈接服務],選取 [AzureBlob 儲存體 1]。 您稍早已建立此連結服務。 如果您使用不同的名稱,請在這裡指定正確的名稱。

    d. 針對 [ 叢集類型],選取 spark

    e. 針對 [服務主體標識符],輸入有權建立 HDInsight 叢集的服務主體標識碼。

    此服務主體必須是訂用帳戶或叢集建立所在的資源群組參與者角色的成員。 如需詳細資訊,請參閱 建立 Microsoft Entra 應用程式和服務主體。 服務主體標識碼相當於應用程式識別碼,而服務主體金鑰相當於客戶端密碼的值

    f. 針對 [服務主體金鑰],輸入金鑰。

    .g 針對 [ 資源群組],選取您在建立數據處理站時所使用的相同資源群組。 Spark 叢集會在此資源群組中建立。

    h. 展開 [操作系統類型]。

    i. 輸入叢集使用者名稱的名稱

    j. 輸入 使用者的 [叢集密碼 ]。

    k. 選取 [完成]

    HDInsight linked service settings

注意

Azure HDInsight 會限制您可以在支援的每個 Azure 區域中使用的核心總數。 針對隨選 HDInsight 連結服務,HDInsight 叢集會建立在與其主要記憶體相同的 Azure 儲存體 位置中。 請確定您有足夠的核心配額可成功建立叢集。 如需詳細資訊,請參閱 使用 Hadoop、Spark、Kafka 等在 HDInsight 中設定叢集。

建立新管線

  1. + 選取 [加號] 按鈕,然後選取功能表上的 [管線]。

    Buttons for creating a new pipeline

  2. 在 [ 活動 ] 工具箱中,展開 [HDInsight]。 將 Spark 活動從 [活動 ] 工具箱拖曳至管線設計工具介面。

    Dragging the Spark activity

  3. 在底部 Spark 活動視窗的屬性中,完成下列步驟:

    a. 切換至 [HDI 叢集] 索引標籤

    b. 選取 [AzureHDInsightLinkedService ] (您在上一個程式中建立的 )。

    Specifying the HDInsight linked service

  4. 切換至 [ 腳稿/Jar] 索引標籤,然後完成下列步驟:

    a. 針對 [作業鏈接服務],選取 [AzureBlob 儲存體 1]。

    b. 選取 [瀏覽 儲存體]。

    Specifying the Spark script on the "Script/Jar" tab

    c. 流覽至 adftutorial/spark/script 資料夾,選取 [WordCount_Spark.py],然後選取 [ 完成]。

  5. 若要驗證管線,請選取工具列上的 [ 驗證] 按鈕。 >> 選取 [右鍵] 按鈕以關閉驗證視窗。

    "Validate" button

  6. 選取 [ 全部發佈]。 Data Factory UI 會將實體(鏈接服務和管線)發佈至 Azure Data Factory 服務。

    "Publish All" button

觸發管線執行

選取 工具列上的 [新增觸發程式 ],然後選取 [ 立即觸發]。

"Trigger" and "Trigger Now" buttons

監視管道執行

  1. 切換至 [ 監視] 索引標籤。確認您看到管線執行。 建立 Spark 叢集大約需要 20 分鐘的時間。

  2. 定期選取 [重新整理] 以檢查管線執行的狀態。

    Tab for monitoring pipeline runs, with "Refresh" button

  3. 若要檢視與管線執行相關聯的活動執行,請選取 [動作] 資料行中的 [檢視活動執行]

    Pipeline run status

    您可以選取頂端的 [所有管線執行 ] 連結,切換回管線執行檢視。

    "Activity Runs" view

確認輸出

確認輸出檔案是在 adftutorial 容器的 spark/otuputfiles/wordcount 資料夾中建立的。

Location of the output file

檔案應該有輸入文字檔中的每個單字,以及檔案中出現單字的次數。 例如:

(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)

此範例中的管線會使用 Spark 活動和隨選 HDInsight 連結服務來轉換數據。 您已了解如何︰

  • 建立資料處理站。
  • 建立使用Spark活動的管線。
  • 觸發管線執行。
  • 監視管線執行。

若要瞭解如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 腳本來轉換數據,請繼續進行下一個教學課程:

教學課程:在 Azure 虛擬網絡 中使用 Hive 轉換數據。