Sdílet prostřednictvím


Jak vytvořit a aktualizovat definici úlohy Sparku ve formátu V2 prostřednictvím rozhraní Microsoft Fabric REST API

Definice úlohy Sparku (SJD) je typ položky infrastruktury, která uživatelům umožňuje definovat a spouštět úlohy Apache Sparku v Microsoft Fabric. Rozhraní API pro definici úloh Sparku v2 umožňuje uživatelům vytvářet a aktualizovat položky definice úloh Sparku novým formátem, který se nazývá SparkJobDefinitionV2. Hlavní výhodou použití formátu v2 je, že umožňuje uživatelům spravovat hlavní spustitelný soubor a další soubory knihovny pomocí jednoho volání rozhraní API, místo použití rozhraní API úložiště k nahrání souborů samostatně, není k správě souborů potřeba žádný token úložiště.

Požadavky

  • Pro přístup k rozhraní REST API fabric se vyžaduje token Microsoft Entra. Doporučujeme použít knihovnu MSAL (Microsoft Authentication Library) pro získání tokenu. Další informace naleznete v tématu Podpora toku ověřování v MSAL.

Rozhraní Microsoft Fabric REST API definuje jednotný koncový bod pro operace CRUD s položkami Fabric. Koncový bod je https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Přehled formátu definice úlohy Spark v2

V datové části správy položky definice úlohy Sparku definition se pole používá k určení podrobného nastavení položky definice úlohy Sparku. Pole definition obsahuje dvě dílčí pole: format a parts. Pole format určuje formát položky definice úlohy Sparku, která by měla být SparkJobDefinitionV2 pro formát v2.

Pole parts je pole, které obsahuje podrobné nastavení položky definice úlohy Sparku. Každá položka v parts poli představuje část podrobného nastavení. Každá část obsahuje tři dílčí pole: path, payloada payloadType. Pole path určuje cestu části, payload pole určuje obsah části, která je zakódovaná v base64, a payloadType pole určuje typ datové části, která by měla být InlineBase64.

Důležité

Tento formát verze 2 podporuje pouze definice úloh Sparku s formáty souborů .py nebo .scala. Formát souboru .jar není podporovaný.

Vytvoření položky definice úlohy Sparku s hlavním definičním souborem a dalšími soubory lib

V následujícím příkladu vytvoříme položku definice úlohy Sparku, která:

  1. Název je SJDHelloWorld.
  2. Hlavní definiční soubor je main.py, což je číst soubor CSV z výchozího Lakehouse a uložit jako tabulku Delta zpět do stejného Lakehouse.
  3. Jiný knihovna soubor je libs.py, který má užitkovou funkci pro vrácení názvu souboru CSV a Delta tabulky.
  4. Výchozí Lakehouse je nastaveno na konkrétní ID artefaktu Lakehouse.

Následuje detailní užitečné zatížení pro vytvoření položky definice úlohy Spark.

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

K dekódování nebo kódování podrobného nastavení můžete použít následující pomocné funkce v Pythonu. Existují také další online nástroje, jako je například https://www.base64decode.org/, které mohou vykonávat stejnou úlohu.

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Odpověď HTTP s kódem 202 označuje, že položka definice úlohy Spark byla úspěšně vytvořena.

Získání definice úlohy Sparku s částmi definice ve formátu v2

Při použití nového formátu verze 2, kdy dojde k získání položky definice úlohy pro Spark s částmi definice, je obsah hlavního souboru definice a ostatních lib souborů zahrnut v datové části odpovědi, zakódován ve formátu base64 pod polem parts. Tady je příklad získání položky definice úlohy Sparku s částmi definice:

  1. Nejprve proveďte požadavek POST na koncový bod https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Ujistěte se, že hodnota parametru dotazu formátu je SparkJobDefinitionV2.
  2. Pak v hlavičce odpovědi zkontrolujte stavový kód HTTP. Kód HTTP 202 označuje, že požadavek byl úspěšně přijat. x-ms-operation-id Zkopírujte hodnotu z hlaviček odpovědi.
  3. Nakonec na koncový bod https://api.fabric.microsoft.com/v1/operations/{operationId} vytvořte požadavek GET s zkopírovanou x-ms-operation-id hodnotou, abyste získali výsledek operace. V odpovědní datové části `definition` obsahuje pole podrobnou konfiguraci položky definice úlohy Spark, včetně hlavního definičního souboru a dalších souborů knihoven uvedených v poli `parts`.

Aktualizace položky definice úlohy Sparku pomocí hlavního definičního souboru a dalších souborů lib ve formátu v2

Pokud chcete aktualizovat existující položku definice úlohy Sparku pomocí hlavního definičního souboru a dalších knihovních souborů ve formátu v2, můžete použít podobnou strukturu datové části jako při operaci vytvoření. Tady je příklad aktualizace položky definice úlohy Sparku vytvořené v předchozí části:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

S výše uvedenou datovou částí se v souborech provádějí následující změny:

  1. Soubor main.py se aktualizuje o nový obsah.
  2. Z této položky definice úlohy Sparku se odstraní lib1.py a odebere se také z úložiště OneLake.
  3. Do této položky definice úlohy Sparku se přidá nový soubor lib2.py a nahraje se do úložiště OneLake.

Pokud chcete aktualizovat položku definice úlohy Sparku, vytvořte do koncového bodu https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} požadavek POST s výše uvedenou datovou částí. Odpověď HTTP s kódem 202 znamená, že položka definice úlohy Sparku byla úspěšně aktualizována.