Dela via


Använda Livy-API:et för att skicka och köra sessionsjobb

Gäller för:✅ Dataingenjör ing och Datavetenskap i Microsoft Fabric

Lär dig hur du skickar Spark-jobb med Livy-API:et för datateknik i Fabric.

Viktigt!

Den här funktionen är i förhandsversion.

Förutsättningar

Livy-API:et definierar en enhetlig slutpunkt för åtgärder. Ersätt platshållarna {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID} och {Entra_ClientSecret} med lämpliga värden när du följer exemplen i den här artikeln.

Konfigurera Visual Studio Code för din Livy API-session

  1. Välj Lakehouse-inställningar i din Fabric Lakehouse.

    Skärmbild som visar Lakehouse-inställningar.

  2. Gå till Livy-endpointsektionen.

    skärmbild som visar Lakehouse Livy-slutpunkt och sessionsjobbs anslutningssträng.

  3. Kopiera sessionsjobbets anslutningssträng (den första röda rutan i bilden) till din kod.

  4. Gå till administrationscentret för Microsoft Entra och kopiera både program-ID:t (klient-) och katalog-ID:t (klientorganisation) till din kod.

    Skärmbild som visar Översikt över Livy API-appen i administrationscentret för Microsoft Entra.

Autentisera en Livy API Spark-session med antingen en Entra-användartoken eller en Entra SPN-token

Autentisera en Livy API Spark-session med en Entra SPN-token

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod.

    from msal import ConfidentialClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID" 
    client_id = "Entra_ClientID"
    client_secret = "Entra_ClientSecret"
    audience = "https://api.fabric.microsoft.com/.default"  
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Get the app-only token
    def get_app_only_token(tenant_id, client_id, client_secret, audience):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID.
            client_id (str): The Service Principal's client ID.
            client_secret (str): The Service Principal's client secret.
            audience (str): The audience for the token (e.g., resource-specific scope).
    
        Returns:
            str: The access token.
        """
        try:
            # Define the authority URL for the tenant
            authority = f"https://login.microsoftonline.com/{tenant_id}"
    
            # Create a ConfidentialClientApplication instance
            app = ConfidentialClientApplication(
                client_id = client_id,
                client_credential = client_secret,
                authority = authority
            )
    
            # Acquire a token using the client credentials flow
            result = app.acquire_token_for_client(scopes = [audience])
    
            # Check if the token was successfully retrieved
            if "access_token" in result:
                return result["access_token"]
            else:
                raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}")
        except Exception as e:
            print(f"Error retrieving token: {e}", fil = sys.stderr)
            sys.exit(1)
    
    token = get_app_only_token(tenant_id, client_id, client_secret, audience)
    
    api_base_url = 'https://api.fabric.microsoft.com/v1/'
    livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"    
    headers = {"Authorization": "Bearer " + token}
    
    print(token)
    
    
  2. I Visual Studio Code bör du se att Microsoft Entra-token returneras.

    Skärmbild som visar den Microsoft Entra SPN-token som returnerades efter att cellen körts. ```

Autentisera en Livy API Spark-session med en Entra-användartoken

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
            authority = "https://login.microsoftonline.com/"Entra_TenantID"
        )
    
     result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
     result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
     "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
    print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url ='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. I Visual Studio Code bör du se att Microsoft Entra-token returneras.

    Skärmbild som visar microsoft Entra-användartoken som returneras efter att cellen har körts.

Skapa en Livy API Spark-session

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    create_livy_session = requests.post(livy_base_url, headers = headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    print(get_session_response.json())
    
  2. Kör notebook-cellen. Du bör se en rad som skrivs ut när Livy-sessionen skapas.

    Skärmbild som visar resultatet av den första notebook-cellkörningen.

  3. Du kan kontrollera att Livy-sessionen har skapats med hjälp av [Visa dina jobb i övervakningshubben](#View dina jobb i övervakningshubben).

Integrering med Fabric-miljöer

Som standard körs den här Livy API-sessionen mot standardstartpoolen för arbetsytan. Du kan också använda Infrastrukturmiljöer Skapa, konfigurera och använda en miljö i Microsoft Fabric för att anpassa Spark-poolen som Livy API-sessionen använder för dessa Spark-jobb. Om du vill använda en Fabric-miljö uppdaterar du bara den föregående notebook-cellen med den här json-nyttolasten.

create_livy_session = requests.post(livy_base_url, headers = headers, json = {
    "conf" : {
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
        }
)

Skicka en spark.sql-instruktion med Livy API Spark-sessionen

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
            "kind": "spark"
        }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Kör notebook-cellen. Du bör se flera rader skrivna ut stegvis när uppgiften skickas och resultaten returneras.

    Skärmbild som visar resultatet av den första notebook-cellen med Spark.sql körning.

Skicka en andra spark.sql-instruktion med Livy API Spark-sessionen

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Kör notebook-cellen. Du bör se flera rader skrivna ut stegvis när uppgiften skickas och resultaten returneras.

    Skärmbild som visar resultatet av den andra notebook-cellkörningen.

Stäng Livy-sessionen med ett tredje kommando

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, header = headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers = headers)
    print (response)
    

Visa dina jobb i övervakningshubben

Du kan komma åt övervakningshubben för att visa olika Apache Spark-aktiviteter genom att välja Övervaka i navigeringslänkarna till vänster.

  1. När sessionen pågår eller är i slutfört tillstånd kan du visa sessionsstatusen genom att gå till Övervaka.

    Skärmbild som visar tidigare Livy API-inlämningar i övervakningshubben.

  2. Välj och öppna det senaste aktivitetsnamnet.

    Skärmbild som visar den senaste Livy API-aktiviteten i övervakningshubben.

  3. I det här Livy API-sessionsfallet kan du se dina tidigare sessioner, körningsinformation, Spark-versioner och konfiguration. Observera den stoppade statusen längst upp till höger.

    Skärmbild som visar den senaste livy-API-aktivitetsinformationen i övervakningshubben.

För att sammanfatta hela processen behöver du en fjärrklient, till exempel Visual Studio Code, en Microsoft Entra-app/SPN-token, Livy API-slutpunkts-URL, autentisering mot Lakehouse och slutligen ett Livy-API för session.