共用方式為


如何透過 Microsoft Fabric REST API 建立並更新帶有 V2 格式的 Spark 工作定義

Spark 工作定義(SJD)是一種 Fabric 項目,允許使用者在 Microsoft Fabric 中定義並執行 Apache Spark 工作。 Spark 工作定義 API v2 允許使用者以新的格式 SparkJobDefinitionV2 建立並更新 Spark 工作定義項目。 使用 v2 格式的主要好處是使用者只需一次 API 呼叫即可管理主執行檔及其他函式庫檔案,無需使用 Storage API 來分別上傳檔案,因此不需要額外的儲存權杖來管理檔案。

先決條件

  • 存取 Fabric REST API 需要 Microsoft Entra 憑證。 建議使用 MSAL(Microsoft 認證庫)函式庫來取得該憑證。 欲了解更多資訊,請參閱 MSAL 中的認證流程支援

Microsoft Fabric REST API 定義了 Fabric 項目 CRUD 操作的統一端點。 該端點為 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items

Spark Job 定義 v2 格式概述

在管理 Spark Job 定義項目的有效載荷中,欄位 definition 用來指定 Spark Job 定義項目的詳細設定。 該 definition 域包含兩個子域: formatparts。 欄位 format 指定 Spark Job 定義項目的格式,應為 SparkJobDefinitionV2 v2 格式。

parts 欄位是一個陣列,包含 Spark Job 定義項目的詳細設定。 陣列中的 parts 每個項目代表詳細設定的一部分。 每個部分包含三個子欄位:path、、 payloadpayloadType和 。 欄位 path 指定零件的路徑, payload 欄位表示以base64編碼的零件內容, payloadType 欄位表示有效載荷的類型,應為 InlineBase64

這很重要

此 v2 格式僅支援 .py 或 .scala 檔案格式的 Spark 工作定義。 .jar檔案格式不支援。

建立一個包含主定義檔及其他 lib 檔的 Spark 工作定義項目

在以下範例中,我們將建立一個 Spark 工作定義項目,內容如下:

  1. 名稱是 SJDHelloWorld
  2. 主要定義檔是 main.py,也就是從預設 Lakehouse 讀取 CSV 檔案,並儲存成 Delta 表格回到同一個 Lakehouse。
  3. 另一個 lib 檔案是 libs.py,它有一個工具函式,可以回傳 CSV 檔案名稱和 Delta 表格。
  4. 預設的 Lakehouse 被設定為特定的 Lakehouse artifact ID。

以下是建立 Spark Job 定義項目的詳細有效載荷。

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

要解碼或編碼詳細設定,你可以使用以下 Python 輔助函式。 還有其他線上工具 https://www.base64decode.org/ 也能完成同樣的工作。

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

HTTP 代碼 202 回應表示 Spark Job 定義項目已成功建立。

以 v2 格式取得 Spark 工作定義及其定義部分

在新的 v2 格式中,當取得帶有定義部分的 Spark Job 定義項目時,主定義檔案及其他 lib 檔案的檔案內容都會包含在回應有效載荷中,base64 編碼於欄位下方 parts 。 這裡有一個取得 Spark 工作定義項目與定義零件的範例:

  1. 首先,向端點 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2發出 POST 請求。 確保格式查詢參數的值為 SparkJobDefinitionV2
  2. 接著,在回應標頭中檢查 HTTP 狀態碼。 HTTP 代碼 202 表示請求已成功接受。 從回應標頭複製 x-ms-operation-id 值。
  3. 最後,向端點 https://api.fabric.microsoft.com/v1/operations/{operationId} 發送 GET 請求,取得複製 x-ms-operation-id 值以取得操作結果。 在回應有效載荷中,欄位 definition 包含 Spark Job 定義項目的詳細設定,包括主定義檔及該欄位下 parts 的其他 lib 檔。

以 v2 格式更新 Spark 工作定義項目,包含主定義檔及其他 lib 檔

若要以 v2 格式更新現有的 Spark Job Definition 項目,包含主定義檔及其他 lib 檔案,您可以使用類似於 create 操作的有效載荷結構。 以下是更新前一節建立的 Spark Job 定義項目的範例:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

在上述有效載荷下,檔案會進行以下變更:

  1. main.py 檔案會更新並加入新內容。
  2. 該 lib1.py 從此 Spark Job 定義項目中被刪除,並從 OneLake 儲存中移除。
  3. 一個新的 lib2.py 檔案會被加入這個 Spark Job 定義項目,並上傳到 OneLake 儲存空間。

要更新 Spark Job 定義項目,請向帶有上述有效載荷的端 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} 點發送 POST 請求。 HTTP 代碼 202 回應表示 Spark 工作定義項目已成功更新。