Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Spark Job Definition (SJD) è un tipo di elemento fabric che consente agli utenti di definire ed eseguire processi Apache Spark in Microsoft Fabric. L'API di definizione del processo Spark v2 consente agli utenti di creare e aggiornare gli elementi di definizione del processo Spark con un nuovo formato denominato SparkJobDefinitionV2. Il vantaggio principale dell'uso del formato v2 consiste nel fatto che consente agli utenti di gestire il file eseguibile principale e altri file di libreria con una singola chiamata API, invece di usare l'API di archiviazione per caricare i file separatamente, non è necessario più token di archiviazione per la gestione dei file.
Prerequisiti
- Per accedere all'API REST di Fabric, è necessario un token Microsoft Entra. Per ottenere il token, è consigliabile usare la libreria MSAL (Microsoft Authentication Library). Per altre informazioni, vedere Supporto del flusso di autenticazione in MSAL.
L'API REST di Microsoft Fabric definisce un endpoint unificato per le operazioni CRUD degli elementi di Fabric. L'endpoint è https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.
Panoramica del formato Spark Job Definition v2
Nel payload della gestione di un elemento di definizione di processo Spark, il campo definition viene utilizzato per specificare la configurazione dettagliata dell'elemento di definizione di processo Spark. Il definition campo contiene due sottocampi: format e parts. Il campo format specifica il formato dell'elemento Definizione Job Spark, che dovrebbe essere SparkJobDefinitionV2 nel formato v2.
Il parts campo è un array che contiene la configurazione dettagliata della Definizione del Job Spark. Ogni elemento nella parts matrice rappresenta una parte della configurazione dettagliata. Ogni parte contiene tre sottocampi: path, payloade payloadType. Il path campo specifica il percorso della parte, il payload campo specifica il contenuto della parte con codifica Base64 e il payloadType campo specifica il tipo del payload, che deve essere InlineBase64.
Importante
Questo formato v2 supporta solo le definizioni dei processi Spark con formati di file di .py o scala. Il formato di file .jar non è supportato.
Creare un elemento di Configurazione di un lavoro Spark con il file di definizione principale e altri file di libreria
Nell'esempio seguente verrà creato un elemento di definizione del job Spark che:
- Il nome è
SJDHelloWorld. - Il file di definizione principale è
main.py, che consiste nel leggere un file CSV dal lakehouse predefinito e salvare come tabella Delta nella stessa Lakehouse. - L'altro file lib è
libs.py, che ha una funzione di utilità per restituire il nome del file CSV e la tabella Delta. - Il Lakehouse predefinito è impostato su un ID artefatto Lakehouse specifico.
Di seguito è riportato il payload dettagliato per la creazione dell'elemento Definizione processo Spark.
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib1.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Per decodificare o codificare la configurazione dettagliata, è possibile usare le funzioni helper seguenti in Python. Ci sono anche altri strumenti online, https://www.base64decode.org/ ad esempio che possono eseguire lo stesso lavoro.
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Una risposta di codice HTTP 202 indica che l'elemento Definizione processo Spark è stato creato correttamente.
Ottenere la definizione del job Spark con parti di definizione nel formato v2
Con il nuovo formato v2, quando si ottiene un elemento Definizione Job di Spark con parti di definizione, il contenuto del file di definizione principale e altri file di libreria sono tutti inclusi nel payload della risposta, nel campo parts, codificato in base64. Ecco un esempio di come ottenere una definizione processo Spark con parti della definizione.
- Prima di tutto, effettuare una richiesta POST all'endpoint
https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Assicurati che il valore del parametro di query 'format' siaSparkJobDefinitionV2. - Quindi, nelle intestazioni della risposta, controlla il codice di stato HTTP. Un codice HTTP 202 indica che la richiesta è stata accettata correttamente. Copiare il valore
x-ms-operation-iddalle intestazioni della risposta. - Infine, effettuare una richiesta GET all'endpoint
https://api.fabric.microsoft.com/v1/operations/{operationId}con il valore copiatox-ms-operation-idper ottenere il risultato dell'operazione. Nel payload della risposta, il campodefinitioncontiene la configurazione dettagliata dell'elemento Definizione del Job Spark, incluso il file di definizione principale e altri file della libreria nel campoparts.
Aggiornare l'elemento Definizione processo Spark con il file di definizione principale e altri file lib nel formato v2
Per aggiornare un elemento di definizione di un job Spark esistente con il file di definizione principale e altri file lib nel formato v2, è possibile utilizzare una struttura di payload simile a quella dell'operazione di creazione. Di seguito è riportato un esempio di aggiornamento della definizione del processo Spark creato nella sezione precedente.
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV2",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Main/main.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
},
{
"path": "Libs/lib2.py",
"payload": "<REDACTED>",
"payloadType": "InlineBase64"
}
]
}
}
Con il payload precedente, vengono apportate le modifiche seguenti ai file:
- Il file main.py viene aggiornato con il nuovo contenuto.
- Il lib1.py viene eliminato da questo elemento di definizione del processo Spark e rimosso anche dall'archiviazione di OneLake.
- Un nuovo file lib2.py viene aggiunto a questo elemento della definizione del processo Spark e caricato nell'archiviazione OneLake.
Per aggiornare l'elemento Definizione del Job Spark, effettuare una richiesta POST all'endpoint https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} con il payload precedente. Una risposta di codice HTTP 202 indica che la voce della definizione del processo Spark è stata aggiornata correttamente.