Guide pratique pour créer et mettre à jour une définition de tâche Spark avec l’API REST de Microsoft Fabric
L’API REST Microsoft Fabric fournit un point de terminaison de service pour les opérations CRUD des éléments Fabric. Dans ce tutoriel, nous suivons un scénario de bout en bout sur la création et la mise à jour d’un artefact de définition de tâche Spark. Il existe trois étapes générales :
- créer un élément de définition de tâche Spark avec un état initial
- charger le fichier de définition principal et d’autres fichiers lib
- mettre à jour l’élément de définition de tâche Spark avec l’URL OneLake du fichier de définition principal et d’autres fichiers lib
Prérequis
- Un jeton Microsoft Entra est nécessaire pour accéder à l’API REST Fabric. La bibliothèque MSAL est recommandée pour obtenir le jeton. Pour plus d’informations, consultez l’article Prise en charge du flux d’authentification dans MSAL.
- Un jeton de stockage est nécessaire pour accéder à l’API OneLake. Pour plus d’informations, consultez l’article MSAL pour Python.
Créer un élément de définition de tâche Spark avec l’état initial
L’API REST Microsoft Fabric définit un point de terminaison unifié pour les opérations CRUD des éléments Fabric. Le point de terminaison a la valeur https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
.
Le détail de l’élément est spécifié dans le corps de la demande. Voici un exemple de corps de la demande pour la création d’un élément de définition de tâche Spark :
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
"payloadType": "InlineBase64"
}
]
}
}
Dans cet exemple, l’élément de définition de tâche Spark est nommé SJDHelloWorld
. Le champ payload
représente le contenu codé au format base64 de la configuration détaillée. Après décodage, le contenu est le suivant :
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
Voici deux fonctions d’assistance pour coder et décoder la configuration détaillée :
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Voici l’extrait de code permettant de créer un élément de définition de tâche Spark :
import requests
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
Charger le fichier de définition principal et d’autres fichiers lib
Un jeton de stockage est nécessaire pour charger le fichier dans OneLake. Voici une fonction d’assistance qui permet d’obtenir le jeton de stockage :
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"{client id}", # this filed should be the client id
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
Nous avons créé un élément de définition de tâche Spark. Pour le rendre exécutable, nous devons configurer le fichier de définition principal et les propriétés nécessaires. Le point de terminaison pour le chargement du fichier de cet élément SJD est https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}
. Le même « workspaceId » que celui de l’étape précédente doit être utilisé. La valeur de « sjdartifactid » est disponible dans le corps de réponse de l’étape précédente. Voici l’extrait de code permettant de configurer le fichier de définition principal :
import requests
# three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")
# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file
appendPosition = 0;
appendAction = "append";
### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type" : "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")
# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file
flushAction = "flush";
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
Suivez le même processus pour charger les autres fichiers lib, si nécessaire.
Mettre à jour l’élément de définition de tâche Spark avec l’URL OneLake du fichier de définition principal et d’autres fichiers lib
Jusqu’à maintenant, nous avons créé un élément de définition de tâche Spark avec un état initial. Nous avons également chargé le fichier de définition principal et d’autres fichiers lib. La dernière étape consiste à mettre à jour l’élément de définition de tâche Spark pour définir les propriétés d’URL du fichier de définition principal et des autres fichiers lib. Le point de terminaison permettant de mettre à jour l’élément de définition de tâche Spark est https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}
. Utilisez les mêmes « workspaceId » et « sjdartifactid » qu’aux étapes précédentes. Voici l’extrait de code permettant de mettre à jour l’élément de définition de tâche Spark :
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id
updateRequestBodyJson = {
"executableFile":mainAbfssPath,
"defaultLakehouseArtifactId":defaultLakehouseId,
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":None,
"commandLineArguments":"",
"additionalLibraryUris":[libsAbfssPath],
"language":"Python",
"environmentArtifactId":None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
Pour récapituler l’ensemble du processus, l’API REST Fabric et l’API OneLake sont nécessaires afin de créer et mettre à jour un élément de définition de tâche Spark. L’API REST de Fabric permet de créer et de mettre à jour l’élément de définition de tâche Spark. L’API de OneLake permet de charger le fichier de définition principal et d’autres fichiers lib. Le fichier de définition principal et d’autres fichiers lib sont d’abord chargés dans OneLake. Ensuite, les propriétés d’URL du fichier de définition principal et d’autres fichiers lib sont définies dans l’élément de définition de tâche Spark.