Spark 工作定義(SJD)是一種 Fabric 項目,允許使用者在 Microsoft Fabric 中定義並執行 Apache Spark 工作。 Spark 工作定義 API v2 允許使用者以新的格式 SparkJobDefinitionV2 建立並更新 Spark 工作定義項目。 使用 v2 格式的主要好處是使用者只需一次 API 呼叫即可管理主執行檔及其他函式庫檔案,無需使用 Storage API 來分別上傳檔案,因此不需要額外的儲存權杖來管理檔案。
先決條件
- 存取 Fabric REST API 需要 Microsoft Entra 憑證。 建議使用 MSAL(Microsoft 認證庫)函式庫來取得該憑證。 欲了解更多資訊,請參閱 MSAL 中的認證流程支援。
Microsoft Fabric REST API 定義了 Fabric 項目 CRUD 操作的統一端點。 該端點為 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items。
Spark Job 定義 v2 格式概述
在管理 Spark Job 定義項目的有效載荷中,欄位 definition 用來指定 Spark Job 定義項目的詳細設定。 該 definition 域包含兩個子域: format 和 parts。 欄位 format 指定 Spark Job 定義項目的格式,應為 SparkJobDefinitionV2 v2 格式。
該 parts 欄位是一個陣列,包含 Spark Job 定義項目的詳細設定。 陣列中的 parts 每個項目代表詳細設定的一部分。 每個部分包含三個子欄位:path、、 payloadpayloadType和 。 欄位 path 指定零件的路徑, payload 欄位表示以base64編碼的零件內容, payloadType 欄位表示有效載荷的類型,應為 InlineBase64。
這很重要
此 v2 格式僅支援 .py 或 .scala 檔案格式的 Spark 工作定義。 .jar檔案格式不支援。
建立一個包含主定義檔及其他 lib 檔的 Spark 工作定義項目
在以下範例中,我們將建立一個 Spark 工作定義項目,內容如下:
- 名稱是
SJDHelloWorld。 - 主要定義檔是
main.py,也就是從預設 Lakehouse 讀取 CSV 檔案,並儲存成 Delta 表格回到同一個 Lakehouse。 - 另一個 lib 檔案是
libs.py,它有一個工具函式,可以回傳 CSV 檔案名稱和 Delta 表格。 - 預設的 Lakehouse 被設定為特定的 Lakehouse artifact ID。
以下是建立 Spark Job 定義項目的詳細有效載荷。
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
要解碼或編碼詳細設定,你可以使用以下 Python 輔助函式。 還有其他線上工具 https://www.base64decode.org/ 也能完成同樣的工作。
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
HTTP 代碼 202 回應表示 Spark Job 定義項目已成功建立。
以 v2 格式取得 Spark 工作定義及其定義部分
在新的 v2 格式中,當取得帶有定義部分的 Spark Job 定義項目時,主定義檔案及其他 lib 檔案的檔案內容都會包含在回應有效載荷中,base64 編碼於欄位下方 parts 。 這裡有一個取得 Spark 工作定義項目與定義零件的範例:
- 首先,向端點
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2發出 POST 請求。 確保格式查詢參數的值為SparkJobDefinitionV2。 - 接著,在回應標頭中檢查 HTTP 狀態碼。 HTTP 代碼 202 表示請求已成功接受。 從回應標頭複製
x-ms-operation-id值。 - 最後,向端點
https://api.fabric.microsoft.com/v1/operations/{operationId}發送 GET 請求,取得複製x-ms-operation-id值以取得操作結果。 在回應有效載荷中,欄位definition包含 Spark Job 定義項目的詳細設定,包括主定義檔及該欄位下parts的其他 lib 檔。
以 v2 格式更新 Spark 工作定義項目,包含主定義檔及其他 lib 檔
若要以 v2 格式更新現有的 Spark Job Definition 項目,包含主定義檔及其他 lib 檔案,您可以使用類似於 create 操作的有效載荷結構。 以下是更新前一節建立的 Spark Job 定義項目的範例:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
在上述有效載荷下,檔案會進行以下變更:
- main.py 檔案會更新並加入新內容。
- 該 lib1.py 從此 Spark Job 定義項目中被刪除,並從 OneLake 儲存中移除。
- 一個新的 lib2.py 檔案會被加入這個 Spark Job 定義項目,並上傳到 OneLake 儲存空間。
要更新 Spark Job 定義項目,請向帶有上述有效載荷的端 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} 點發送 POST 請求。 HTTP 代碼 202 回應表示 Spark 工作定義項目已成功更新。