Del via


Brug Livy-API'en til at sende og udføre Livy-batchjob

Gælder for:✅ Dataudvikler ing og datavidenskab i Microsoft Fabric

Få mere at vide om, hvordan du indsender Spark-batchjob ved hjælp af Livy-API'en til Fabric Data Engineering. Livy-API'en understøtter i øjeblikket ikke Azure Service Principal (SPN).

Forudsætninger

Livy-API'en definerer et samlet slutpunkt for handlinger. Erstat pladsholderne {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} og {Fabric_LakehouseID} med de relevante værdier, når du følger eksemplerne i denne artikel.

Konfigurer Visual Studio Code for dit Livy API-batch

  1. Vælg Lakehouse-indstillinger i Fabric Lakehouse.

    Skærmbillede, der viser indstillingerne for Lakehouse.

  2. Gå til sektionen Livy-slutpunkt .

    skærmbillede, der viser Lakehouse Livy-slutpunktet og sessionsjobbet forbindelsesstreng.

  3. Kopiér batchjobbet forbindelsesstreng (det andet røde felt på billedet) til din kode.

  4. Gå til Microsoft Entra Administration , og kopiér både program-id'et (klient)-id'et og mappe-id'et (lejer) til din kode.

    Skærmbillede, der viser oversigt over Livy API-app i Microsoft Entra Administration.

Opret en Spark Batch-kode, og upload den til dit Lakehouse

  1. Opret en .ipynb notesbog i Visual Studio Code, og indsæt følgende kode

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Gem Python-filen lokalt. Denne Python-kodedata indeholder to Spark-sætninger, der fungerer på data i et Lakehouse og skal uploades til dit Lakehouse. Du skal bruge ABFS-stien til nyttedataene, der skal refereres til i dit Livy API-batchjob i Visual Studio Code og dit Lakehouse-tabelnavn i Select SQL-sætningen.

    Skærmbillede, der viser Python-nyttedatacellen.

  3. Upload Python-nyttedataene til filafsnittet i lakehouse. I Lakehouse-stifinderen skal du vælge Filer. Vælg derefter >Hent data>Upload filer. Vælg filer via filvælgeren.

    Skærmbillede, der viser nyttedata i afsnittet Filer i Lakehouse.

  4. Når filen er i afsnittet Filer i Lakehouse, skal du klikke på de tre prikker til højre for dit payloadfilnavn og vælge Egenskaber.

    Skærmbillede, der viser ABFS-stien til nyttedata i egenskaberne for filen i Lakehouse.

  5. Kopiér denne ABFS-sti til din notesbogcelle i trin 1.

Godkend en Livy API Spark-batchsession ved hjælp af enten et Microsoft Entra-brugertoken eller et Microsoft Entra SPN-token

Godkend en Livy API Spark-batchsession ved hjælp af et Microsoft Entra SPN-token

  1. Opret en .ipynb notesbog i Visual Studio Code, og indsæt følgende kode.

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. Kør notebook-cellen, bør du kunne se Microsoft Entra-tokenet returneret.

    Skærmbillede, der viser Microsoft Entra SPN-tokenet, der returneres efter kørslen af cellen.

Godkend en Livy API Spark-session ved hjælp af et Microsoft Entra-brugertoken

  1. Opret en .ipynb notesbog i Visual Studio Code, og indsæt følgende kode.

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Microsoft Fabric API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",        # Read lakehouse metadata
        "https://api.fabric.microsoft.com/Item.ReadWrite.All",        # Read/write fabric items
        "https://api.fabric.microsoft.com/Workspace.ReadWrite.All",   # Access workspace operations
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",    # Access storage from code
        "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All",     # Access Azure Key Vault
        "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer
        "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All",     # Access Azure Data Lake
        "https://api.fabric.microsoft.com/Code.AccessFabric.All"             # General Fabric access
    ]
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. Kør notesbogcellen. Der vises et pop op-vindue i browseren, så du kan vælge den identitet, du vil logge på med.

    Skærmbillede, der viser logonskærmen til Microsoft Entra-appen.

  3. Når du har valgt den identitet, du vil logge på med, skal du godkende tilladelserne til Microsoft Entra-appregistrerings-API'en.

    Skærmbillede, der viser API-tilladelser til Microsoft Entra-appen.

  4. Luk browservinduet, når godkendelsen er fuldført.

    Skærmbillede, der viser, at godkendelsen er fuldført.

  5. I Visual Studio Code kan du se, at Microsoft Entra-tokenet returneres.

    Skærmbillede, der viser Det Microsoft Entra-token, der returneres efter at have kørt celle og logget på.

Send en Livy Batch, og overvåg batchjob.

  1. Tilføj en anden notesbogcelle, og indsæt denne kode.

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. Kør notesbogcellen. Du kan se flere linjer udskrevet, når Livy-batchjobbet oprettes og køres.

    Skærmbillede, der viser resultater i Visual Studio Code, når Livy-batchjobbet er sendt.

  3. Hvis du vil se ændringerne, skal du gå tilbage til dit søhus.

Integration med Fabric-miljøer

Denne Livy API-session kører som standard i forhold til standardstartgruppen for arbejdsområdet. Du kan også bruge Fabric Environments Til at oprette, konfigurere og bruge et miljø i Microsoft Fabric til at tilpasse Spark-puljen, som Livy API-sessionen bruger til disse Spark-job. Hvis du vil bruge dit Fabric Environment, skal du opdatere den tidligere notebook-celle med denne ene linjeændring.

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

Få vist dine job i overvågningshubben

Du kan få adgang til overvågningshubben for at få vist forskellige Apache Spark-aktiviteter ved at vælge Overvåg i navigationslinkene til venstre.

  1. Når batchjobbet er fuldført, kan du få vist sessionsstatussen ved at gå til Overvågning.

    Skærmbillede, der viser tidligere Livy API-indsendelser i overvågningshubben.

  2. Vælg og åbn det seneste aktivitetsnavn.

    Skærmbillede, der viser den seneste Livy API-aktivitet i overvågningshubben.

  3. I denne Livy API-session kan du se din tidligere batchafsendelse, køre detaljer, Spark-versioner og konfiguration. Læg mærke til den stoppet status øverst til højre.

    Skærmbillede, der viser de seneste Livy API-aktivitetsoplysninger i overvågningshubben.

Hvis du vil opsummere hele processen, skal du bruge en fjernklient, f.eks . Visual Studio Code, et Microsoft Entra-app-token, URL-adressen til Livy API-slutpunktet, godkendelse mod lakehouse, en Spark-nyttedata i dit Lakehouse og endelig en batch Livy-API-session.