在 Azure Data Factory 中,透過使用 Spark 活動項目在雲端進行資料轉換。

適用於: Azure Data Factory Azure Synapse Analytics

秘訣

Data Factory in Microsoft Fabric 是下一代的 Azure Data Factory,擁有更簡單的架構、內建 AI 及新功能。 如果你是資料整合新手,建議先從 Fabric Data Factory 開始。 現有的 ADF 工作負載可升級至 Fabric,以存取資料科學、即時分析與報告等新能力。

在這個教學中,你會使用 Azure 入口網站建立 Azure Data Factory 管線。 此管線會使用 Spark 活動和隨選 Azure HDInsight 連結服務來轉換資料。

您會在本教學課程中執行下列步驟:

  • 建立資料處理站。
  • 建立使用 Spark 活動的管線。
  • 觸發管線執行。
  • 監視管道執行。

如果你沒有Azure訂閱,請在開始前建立一個free帳號

必要條件

注意

我們建議你使用 Azure Az PowerShell 模組來與 Azure 互動。 要開始,請參考 安裝 Azure PowerShell。 想了解如何遷移到 Az PowerShell 模組,請參考 Migrate Azure PowerShell from AzureRM to Az

  • Azure 儲存帳號。 你建立一個 Python 腳本和一個輸入檔案,然後上傳到 Azure Storage。 Spark 程式的輸出會儲存在這個儲存體帳戶中。 隨選 Spark 叢集使用相同的儲存體帳戶作為其主要儲存體。

注意

HdInsight 僅支援標準層的一般用途儲存體帳戶。 請確定帳戶不是高級帳戶或僅限於 Blob 儲存的帳戶。

把 Python 腳本上傳到 Blob 儲存帳號

  1. 建立一個名為 WordCount_Spark.pyPython 的檔案,內容如下:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替換成你Azure的儲存帳戶名稱。 然後儲存檔案。

  3. 在 Azure Blob 儲存中,若不存在,請建立一個名為 adftutorial 的容器。

  4. 建立名為 spark 的資料夾。

  5. spark 資料夾下,建立名為 script 的子資料夾。

  6. WordCount_Spark.py 檔案上傳至 script 子資料夾。

上傳輸入檔案

  1. 建立名為 minecraftstory.txt 的檔案並填入一些文字。 Spark 程式會計算這段文字中的字數。
  2. spark 資料夾下,建立名為 inputfiles 的子資料夾。
  3. minecraftstory.txt 檔案上傳至 inputfiles 子資料夾。

建立資料處理站

如果你還沒有資料工廠,請依照文章快速入門:使用 Azure portal 建立資料工廠 的步驟來建立資料工廠。

建立連結服務

在本節中,您會創建兩個關聯服務:

  • 一個Azure 儲存體連結服務,將 Azure 儲存體帳號連結到 Data Factory。 隨選 HDInsight 叢集會使用此儲存體。 而且也包含要執行的 Spark 指令碼。
  • HDInsight 隨選連結服務。 Azure Data Factory 會自動建立 HDInsight 叢集並執行 Spark 程式。 然後在 HDInsight 叢集的閒置時間達到預先設定的時間後,系統就會刪除該叢集。

建立一個 Azure Storage 連結服務

  1. 在首頁,切換至左側面板中的 [管理] 索引標籤。

    顯示 [管理] 索引標籤的螢幕快照。

  2. 選取視窗底部的 [連線],然後選取 [+ 新增]

    建立新聯機的按鈕

  3. New Link Service 視窗中,選擇 Data Store>Azure Blob Storage,然後選擇 Continue

    選擇「Azure Blob Storage」圖塊

  4. 針對 [儲存體帳戶名稱],從清單中選取名稱,然後選取 [儲存]

    指定記憶體帳戶名稱的方塊

建立隨需 HDInsight 連線服務

  1. 再次選取 [+新增] 按鈕以建立另一個連結服務。

  2. New Link Service 視窗中,選擇 Compute>Azure HDInsight,然後選擇 Continue

    選擇「Azure HDInsight」磚塊

  3. 在 [新增連結服務] 視窗中,完成下列步驟:

    a. 針對 [名稱],輸入 AzureHDInsightLinkedService

    b. 針對 [類型],確認已選取 [隨選 HDInsight]

    c. 對於 Azure Storage Linked Service,選擇 AzureBlobStorage1。 您之前已建立此連結服務。 如果您使用不同的名稱,請在此指定正確的名稱。

    d. 針對 叢集類型,選取 spark

    e. 針對 [服務主體識別碼],輸入有權建立 HDInsight 叢集的服務主體識別碼。

    此服務主體必須是訂用帳戶的參與者角色成員,或建立叢集所在的資源群組成員。 欲了解更多資訊,請參見 建立Microsoft Entra應用程式與服務主體服務主體識別碼相當於「應用程式識別碼」,而服務主體金鑰則相當於「用戶端密碼」的值。

    f. 針對 [服務主體金鑰],輸入金鑰。

    g. 針對 [資源群組],選取建立資料處理站時使用的相同資源群組。 將在此資源群組中建立 Spark 叢集。

    h. 展開 OS 類型

    i. 針對叢集使用者名稱輸入名稱。

    j. 輸入使用者的叢集密碼

    k. 選取 [完成]。

    HDInsight 鏈接服務設定

注意

Azure HDInsight 會限制你在每個 Azure 區域中可使用的總核心數量。 對於隨選 HDInsight 連結服務,HDInsight 叢集會建立在與其主要儲存體所使用的相同 Azure 儲存體位置。 請確定您有足夠的核心配額,才能成功建立叢集。 如需詳細資訊,請參閱使用 Hadoop、Spark 及 Kafka 等在 HDInsight 中設定叢集

建立新管線

  1. 選取 + (加號) 按鈕,然後選取選單上的 管線

    建立新管線的按鈕

  2. 在 [活動工具] 工具箱中,展開 [HDInsight]。 將 活動 工具箱中的 Spark 活動拖放到管線設計工具介面。

    拖曳 Spark 活動

  3. 在 [Spark] 活動視窗底部的屬性設定裡,完成下列步驟:

    a. 切換至 “HDI 叢集”索引標籤

    b. 選取您在上一個程序中建立的 AzureHDInsightLinkedService

    指定 HDInsight 連結服務

  4. 切換至 指令碼/Jar 索引標籤,然後完成以下步驟:

    a. 針對「作業連結服務」,選取「AzureBlobStorage1」

    b. 選取 瀏覽儲存體

    在“腳本/Jar”標籤中指定 Spark 腳本

    c. 瀏覽至 adftutorial/spark/script 資料夾,選取 WordCount_Spark.py,然後選取 [完成]

  5. 若要驗證管線,選取工具列上的 [驗證] 按鈕。 選取 >> (右箭頭) 按鈕以關閉驗證視窗。

    “Validate”按鈕

  6. 選擇全部發佈。 Data Factory 介面會將實體(連結服務與管線)發佈至 Azure Data Factory 服務。

    “發佈全部”按鈕

觸發管線執行

選取工具列上的 [新增觸發程序],然後選取 [立即觸發]

“Trigger”和“Trigger Now”按鈕

監視管道運行

  1. 切換至 監視 索引標籤,確認您看到管道運行。 建立 Spark 叢集需要約 20 分鐘。

  2. 請定期選取 [重新整理] 以檢查管線運行狀態。

    用於監視管線執行的索引標籤,帶有 "Refresh" 按鈕

  3. 若要檢視與管線執行相關聯的活動執行,請選取動作資料行中的 檢視活動執行

    流水線運行狀態

    您可以選取頂端的所有管線執行連結,切換回管線執行檢視。

    “活動執行”視圖

驗證輸出

確認已在 adftutorial 容器的 spark/otuputfiles/wordcount 資料夾中建立輸出檔案。

輸出檔案的位置

這個檔案應該有輸入文字檔中的每個字組,以及字組在檔案中出現的次數。 例如:

(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)

此範例中的管線會使用 Spark 活動和隨選 HDInsight 連結服務來轉換資料。 您已了解如何︰

  • 建立資料處理站。
  • 建立使用 Spark 活動的管線。
  • 觸發管線執行。
  • 監視管道執行。

想了解如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 腳本來轉換資料,請繼續閱讀下一個教學:

教學:在 Azure Virtual Network 中使用 Hive 進行資料轉換。