Spark 作业定义(SJD)是一种 Fabric 项类型,允许用户在 Microsoft Fabric 中定义和运行 Apache Spark 作业。 Spark 作业定义 API v2 允许用户使用名为 SparkJobDefinitionV2 的新格式创建和更新 Spark 作业定义项。 使用 v2 格式的主要好处是,它允许用户使用一个 API 调用管理主可执行文件和其他库文件,而不是使用存储 API 单独上传文件,无需再使用存储令牌来管理文件。
先决条件
- 访问 Fabric REST API 需要Microsoft Entra 令牌。 建议使用 MSAL(Microsoft身份验证库)库来获取令牌。 有关详细信息,请参阅 MSAL 中的身份验证流支持。
Microsoft Fabric REST API 定义了用于 Fabric 项 CRUD 操作的统一终结点。 终结点为 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items。
Spark 作业定义 v2 格式概述
在管理 Spark 作业定义项的有效负载中, definition 该字段用于指定 Spark 作业定义项的详细设置。 该 definition 字段包含两个子字段: format 和 parts。 该 format 字段指定 Spark 作业定义项的格式,该项应 SparkJobDefinitionV2 为 v2 格式。
该 parts 字段是一个数组,其中包含 Spark 作业定义项的详细设置。 数组中的每个 parts 项都表示详细设置的一部分。 每个部分包含三个子字段: path、 payload和 payloadType。 该 path 字段指定部件的路径, payload 字段指定 base64 编码的部件的内容,字段 payloadType 指定有效负载的类型,应为 InlineBase64。
重要
此 v2 格式仅支持具有.py或 .scala 文件格式的 Spark 作业定义。 不支持.jar文件格式。
使用主定义文件和其他 lib 文件创建 Spark 作业定义项
在以下示例中,我们将创建一个 Spark 作业定义项,该项如下:
- 名称为
SJDHelloWorld. - 主定义文件是
main.py:从默认 Lakehouse 读取 CSV 文件,并将其另存为 Delta 表,并将其保存回同一 Lakehouse。 - 其他 lib 文件是
libs.py,它具有一个实用工具函数,用于返回 CSV 文件和 Delta 表的名称。 - 默认的 Lakehouse 已指定为特定的 Lakehouse 工件 ID。
下面是用于创建 Spark 作业定义项的详细有效负载。
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
若要解码或编码详细的设置,可以在 Python 中使用以下帮助程序函数。 还有其他联机工具,例如 https://www.base64decode.org/ 可以执行相同的作业。
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
HTTP 代码 202 响应指示已成功创建 Spark 作业定义项。
获取 Spark 作业定义及其在 v2 格式下的定义部分
使用新的 v2 格式,获取包含定义部件的 Spark 作业定义项时,主定义文件的文件内容和其他 lib 文件都包含在响应有效负载中,base64 编码在 parts 字段下。 下面是获取包含定义部分的 Spark 作业定义项的示例:
- 首先,向终结点
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2发出 POST 请求。 确保格式查询参数的值为SparkJobDefinitionV2. - 然后,在响应标头中,检查 HTTP 状态代码。 HTTP 代码 202 指示请求已成功接受。 从响应标头中复制
x-ms-operation-id值。 - 最后,使用复制
https://api.fabric.microsoft.com/v1/operations/{operationId}的值向终结点x-ms-operation-id发出 GET 请求以获取作结果。 在响应有效负载中,definition字段包含 Spark 作业定义项的详细设置,包括主定义文件以及parts下的其他库文件。
使用 v2 格式的主定义文件和其他库文件更新 Spark 作业定义项
若要以 v2 格式使用主定义文件和其他库文件更新现有 Spark 作业定义项,可以使用与创建操作类似的负载结构。 下面是更新在上一部分创建的 Spark 作业定义项的示例:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
当使用上述数据包时,文件将进行以下更改:
- main.py 文件使用新内容进行更新。
- 从此 Spark 作业定义项中删除了 lib1.py,并且也从 OneLake 存储中删除。
- 新的 lib2.py 文件将添加到此 Spark 作业定义项,并上传到 OneLake 存储。
若要更新 Spark 作业定义项,请向具有上述有效负载的终结点 https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} 发出 POST 请求。 HTTP 代码 202 响应指示已成功更新 Spark 作业定义项。