Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Gäller för:✅ Dataingenjör ing och Datavetenskap i Microsoft Fabric
Lär dig hur du skickar Spark-jobb med Livy-API:et för datateknik i Fabric.
Viktigt!
Den här funktionen är i förhandsversion.
Förutsättningar
Fabric Premium - eller utvärderingskapacitet med en Lakehouse
En fjärrklient som Visual Studio Code med Jupyter Notebooks, PySpark och Microsoft Authentication Library (MSAL) för Python
Antingen ett Microsoft Entra-apptoken. Registrera ett program med Microsofts identitetsplattform
Eller en Microsoft Entra SPN-token. Lägga till och hantera programautentiseringsuppgifter i Microsoft Entra
Viss data i ditt lakehouse, i det här exemplet används NYC Taxi & Limousine Commission green_tripdata_2022_08, en parquet-fil som lästs in till lakehouse.
Livy-API:et definierar en enhetlig slutpunkt för åtgärder. Ersätt platshållarna {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID} och {Entra_ClientSecret} med lämpliga värden när du följer exemplen i den här artikeln.
Konfigurera Visual Studio Code för din Livy API-session
Välj Lakehouse-inställningar i din Fabric Lakehouse.
Gå till Livy-endpointsektionen.
Kopiera sessionsjobbets anslutningssträng (den första röda rutan i bilden) till din kod.
Gå till administrationscentret för Microsoft Entra och kopiera både program-ID:t (klient-) och katalog-ID:t (klientorganisation) till din kod.
Autentisera en Livy API Spark-session med antingen en Entra-användartoken eller en Entra SPN-token
Autentisera en Livy API Spark-session med en Entra SPN-token
Skapa en
.ipynb
notebook-fil i Visual Studio Code och infoga följande kod.from msal import ConfidentialClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" client_secret = "Entra_ClientSecret" audience = "https://api.fabric.microsoft.com/.default" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Get the app-only token def get_app_only_token(tenant_id, client_id, client_secret, audience): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. Args: tenant_id (str): The Azure Active Directory tenant ID. client_id (str): The Service Principal's client ID. client_secret (str): The Service Principal's client secret. audience (str): The audience for the token (e.g., resource-specific scope). Returns: str: The access token. """ try: # Define the authority URL for the tenant authority = f"https://login.microsoftonline.com/{tenant_id}" # Create a ConfidentialClientApplication instance app = ConfidentialClientApplication( client_id = client_id, client_credential = client_secret, authority = authority ) # Acquire a token using the client credentials flow result = app.acquire_token_for_client(scopes = [audience]) # Check if the token was successfully retrieved if "access_token" in result: return result["access_token"] else: raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}") except Exception as e: print(f"Error retrieving token: {e}", fil = sys.stderr) sys.exit(1) token = get_app_only_token(tenant_id, client_id, client_secret, audience) api_base_url = 'https://api.fabric.microsoft.com/v1/' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + token} print(token)
I Visual Studio Code bör du se att Microsoft Entra-token returneras.
Autentisera en Livy API Spark-session med en Entra-användartoken
Skapa en
.ipynb
notebook-fil i Visual Studio Code och infoga följande kod.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority = "https://login.microsoftonline.com/"Entra_TenantID" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url ='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
I Visual Studio Code bör du se att Microsoft Entra-token returneras.
Skapa en Livy API Spark-session
Lägg till ytterligare en notebook-cell och infoga den här koden.
create_livy_session = requests.post(livy_base_url, headers = headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json())
Kör notebook-cellen. Du bör se en rad som skrivs ut när Livy-sessionen skapas.
Du kan kontrollera att Livy-sessionen har skapats med hjälp av [Visa dina jobb i övervakningshubben](#View dina jobb i övervakningshubben).
Integrering med Fabric-miljöer
Som standard körs den här Livy API-sessionen mot standardstartpoolen för arbetsytan. Du kan också använda Infrastrukturmiljöer Skapa, konfigurera och använda en miljö i Microsoft Fabric för att anpassa Spark-poolen som Livy API-sessionen använder för dessa Spark-jobb. Om du vill använda en Fabric-miljö uppdaterar du bara den föregående notebook-cellen med den här json-nyttolasten.
create_livy_session = requests.post(livy_base_url, headers = headers, json = {
"conf" : {
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
}
)
Skicka en spark.sql-instruktion med Livy API Spark-sessionen
Lägg till ytterligare en notebook-cell och infoga den här koden.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Kör notebook-cellen. Du bör se flera rader skrivna ut stegvis när uppgiften skickas och resultaten returneras.
Skicka en andra spark.sql-instruktion med Livy API Spark-sessionen
Lägg till ytterligare en notebook-cell och infoga den här koden.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Kör notebook-cellen. Du bör se flera rader skrivna ut stegvis när uppgiften skickas och resultaten returneras.
Stäng Livy-sessionen med ett tredje kommando
Lägg till ytterligare en notebook-cell och infoga den här koden.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, header = headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers = headers) print (response)
Visa dina jobb i övervakningshubben
Du kan komma åt övervakningshubben för att visa olika Apache Spark-aktiviteter genom att välja Övervaka i navigeringslänkarna till vänster.
När sessionen pågår eller är i slutfört tillstånd kan du visa sessionsstatusen genom att gå till Övervaka.
Välj och öppna det senaste aktivitetsnamnet.
I det här Livy API-sessionsfallet kan du se dina tidigare sessioner, körningsinformation, Spark-versioner och konfiguration. Observera den stoppade statusen längst upp till höger.
För att sammanfatta hela processen behöver du en fjärrklient, till exempel Visual Studio Code, en Microsoft Entra-app/SPN-token, Livy API-slutpunkts-URL, autentisering mot Lakehouse och slutligen ett Livy-API för session.