Muistiinpano
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää kirjautua sisään tai vaihtaa hakemistoa.
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää vaihtaa hakemistoa.
Koskee seuraavia:✅ Microsoft Fabricin tietotekniikka ja datatiede
Opi lähettämään Spark-erätöitä Fabric Data Engineeringin Livy-ohjelmointirajapinnan avulla. Livy-ohjelmointirajapinta ei tällä hetkellä tue Azuren palvelun päänimeä (SPN).
Edellytykset
Fabric Premium - tai kokeiluversion kapasiteetti Lakehousella.
Etäasiakas, kuten Visual Studio Code , jossa on Jupyter Notebooks, PySpark ja Microsoft Authentication Library (MSAL) for Python.
Fabric Rest -ohjelmointirajapinnan käyttämiseen vaaditaan Microsoft Entra -sovellustunnus. Rekisteröi sovellus Microsoftin käyttäjätietoympäristössä.
Jotkin tiedot lakehousessa, tässä esimerkissä käytetään NEWC Taxi & Limousine Commission green_tripdata_2022_08 lakehouseen ladattua parquet-tiedostoa.
Livy-ohjelmointirajapinta määrittää toimintojen yhtenäisen päätepisteen. Korvaa paikkamerkit {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} ja {Fabric_LakehouseID} sopivilla arvoilla, kun noudatat tämän artikkelin esimerkkejä.
Visual Studio Coden määrittäminen Livy-ohjelmointirajapintaerälle
Valitse Lakehouse-asetukset Fabric Lakehouse -kohteessasi.
Siirry Livy-päätepisteosioon.
Kopioi erätyön yhteysmerkkijono (kuvan toinen punainen ruutu) koodiin.
Siirry Microsoft Entra -hallintakeskukseen ja kopioi sekä Sovelluksen (asiakkaan) että hakemiston (vuokraajan) tunnus koodiin.
Luo Spark Batch -koodi ja lataa se Lakehouseen
.ipynbLuo muistikirja Visual Studio Codessa ja lisää seuraava koodiimport sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("batch_demo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") tableName = spark_context.getConf().get("spark.targetTable") if tableName is not None: print("tableName: " + str(tableName)) else: print("tableName is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4)) deltaTablePath = f"Tables/{tableName}CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)Tallenna Python-tiedosto paikallisesti. Tämä Python-koodin tiedot sisältää kaksi Spark-laskelmaa, jotka käsittelevät Lakehousessa olevia tietoja ja jotka on ladattava Lakehouse-palvelimeesi. Tarvitset hyötykuorman ABFS-polun, johon viittaa Livy-ohjelmointirajapinnan erätyössä Visual Studio Codessa ja Lakehouse-taulukon nimen Valitse SQL -lausekkeessa.
Lataa Python-tiedot Lakehousen tiedostot-osioon. Valitse Lakehousen resurssienhallinnassa Tiedostot. Valitse sitten >Nouda tiedot>Lataa tiedostot. Valitse tiedostot tiedostovalitsimen kautta.
Kun tiedosto on Lakehousen Tiedostot-osassa, napsauta kolmea pistettä hyötykuormatiedoston nimen oikealla puolella ja valitse Ominaisuudet.
Kopioi tämä ABFS-polku muistikirjan soluun vaiheessa 1.
Livy API Spark -eräistunnon todentaminen joko Microsoft Entran käyttäjätunnuksen tai Microsoft Entra SPN -tunnuksen avulla
Livy API Spark -eräistunnon todentaminen Microsoft Entra SPN -tunnuksen avulla
.ipynbLuo muistikirja Visual Studio Codessa ja lisää seuraava koodi.import sys from msal import ConfidentialClientApplication # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Service Principal Application ID # Certificate paths - Update these paths to your certificate files certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem" # Public certificate file private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem" # Private key file certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint # OAuth settings audience = "https://analysis.windows.net/powerbi/api/.default" authority = f"https://login.windows.net/{tenant_id}" def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. This function uses certificate-based authentication which is more secure than client secrets. Args: client_id (str): The Service Principal's client ID audience (str): The audience for the token (resource scope) authority (str): The OAuth authority URL certificate_path (str): Path to the certificate file (.pem format) private_key_path (str): Path to the private key file (.pem format) certificate_thumbprint (str): Certificate thumbprint (optional but recommended) Returns: str: The access token for API authentication Raises: Exception: If token acquisition fails """ try: # Read the certificate from PEM file with open(certificate_path, "r", encoding="utf-8") as f: certificate_pem = f.read() # Read the private key from PEM file with open(private_key_path, "r", encoding="utf-8") as f: private_key_pem = f.read() # Create the confidential client application app = ConfidentialClientApplication( client_id=client_id, authority=authority, client_credential={ "private_key": private_key_pem, "thumbprint": certificate_thumbprint, "certificate": certificate_pem } ) # Acquire token using client credentials flow token_response = app.acquire_token_for_client(scopes=[audience]) if "access_token" in token_response: print("Successfully acquired access token") return token_response["access_token"] else: raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}") except FileNotFoundError as e: print(f"Certificate file not found: {e}") sys.exit(1) except Exception as e: print(f"Error retrieving token: {e}", file=sys.stderr) sys.exit(1) # Get the access token token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)Suorita muistikirjan solu, jolloin Microsoft Entra -tunnus palautetaan.
Livy API Spark -istunnon todentaminen Microsoft Entra -käyttäjätunnuksen avulla
.ipynbLuo muistikirja Visual Studio Codessa ja lisää seuraava koodi.from msal import PublicClientApplication import requests import time # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Application ID (can be the same as above or different) # Required scopes for Microsoft Fabric API access scopes = [ "https://api.fabric.microsoft.com/Lakehouse.Execute.All", # Execute operations in lakehouses "https://api.fabric.microsoft.com/Lakehouse.Read.All", # Read lakehouse metadata "https://api.fabric.microsoft.com/Item.ReadWrite.All", # Read/write fabric items "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", # Access workspace operations "https://api.fabric.microsoft.com/Code.AccessStorage.All", # Access storage from code "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", # Access Azure Key Vault "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", # Access Azure Data Lake "https://api.fabric.microsoft.com/Code.AccessFabric.All" # General Fabric access ] def get_access_token(tenant_id, client_id, scopes): """ Get an access token using interactive authentication. This method will open a browser window for user authentication. Args: tenant_id (str): The Azure Active Directory tenant ID client_id (str): The application client ID scopes (list): List of required permission scopes Returns: str: The access token, or None if authentication fails """ app = PublicClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}" ) print("Opening browser for interactive authentication...") token_response = app.acquire_token_interactive(scopes=scopes) if "access_token" in token_response: print("Successfully authenticated") return token_response["access_token"] else: print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}") return None # Uncomment the lines below to use interactive authentication token = get_access_token(tenant_id, client_id, scopes) print("Access token acquired via interactive login")Suorita muistikirjasolu. Selaimeen pitäisi ilmestyä ponnahdusikkuna, jonka avulla voit valita kirjautumisen käyttäjätiedot.
Kun olet valinnut käyttäjätiedot, joilla kirjaudut sisään, sinun on hyväksyttävä Microsoft Entra -sovelluksen rekisteröinnin ohjelmointirajapinnan käyttöoikeudet.
Sulje selainikkuna todentamisen suorittamisen jälkeen.
Sinun pitäisi nähdä Microsoft Entra -tunnus Visual Studio Codessa.
Lähetä Livy-erä ja valvo erätyötä.
Lisää toinen muistikirjasolu ja lisää tämä koodi.
# submit payload to existing batch session import requests import time import json api_base_url = "https://api.fabric.microsoft.com/v1" # Base URL for Fabric APIs # Fabric Resource IDs - Replace with your workspace and lakehouse IDs workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Construct the Livy Batch API URL # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches" # Set up authentication headers headers = {"Authorization": f"Bearer {token}"} print(f"Livy Batch API URL: {livy_base_url}") new_table_name = "TABLE_NAME" # Name for the new table # Configure the batch job print("Configuring batch job parameters...") # Batch job configuration - Modify these values for your use case payload_data = { # Job name - will appear in the Fabric UI "name": f"livy_batch_demo_{new_table_name}", # Path to your Python file in the lakehouse "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>", # Replace with your Python file path # Optional: Spark configuration parameters "conf": { "spark.targetTable": new_table_name, # Custom configuration for your application }, } print("Batch Job Configuration:") print(json.dumps(payload_data, indent=2)) try: # Submit the batch job print("\nSubmitting batch job...") post_batch = requests.post(livy_base_url, headers=headers, json=payload_data) if post_batch.status_code == 202: batch_info = post_batch.json() print("Livy batch job submitted successfully!") print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}") # Extract batch ID for monitoring batch_id = batch_info['id'] livy_batch_get_url = f"{livy_base_url}/{batch_id}" print(f"\nBatch Job ID: {batch_id}") print(f"Monitoring URL: {livy_batch_get_url}") else: print(f"Failed to submit batch job. Status code: {post_batch.status_code}") print(f"Response: {post_batch.text}") except requests.exceptions.RequestException as e: print(f"Network error occurred: {e}") except json.JSONDecodeError as e: print(f"JSON decode error: {e}") print(f"Response text: {post_batch.text}") except Exception as e: print(f"Unexpected error: {e}")Suorita muistikirjasolu. Livy Batch -työn luomisen ja suorittamisen aikana pitäisi näkyä useita rivejä.
Jos haluat nähdä muutokset, siirry takaisin Lakehouseen.
Integrointi Fabric-ympäristöihin
Tämä Livy-ohjelmointirajapinnan istunto suoritetaan oletusarvoisesti työtilan oletusarvoista aloitussarjaa vasten. Vaihtoehtoisesti voit käyttää Fabric-ympäristöjä Luo, määritä ja käytä Microsoft Fabric -ympäristöä , jotta voit mukauttaa Spark-varantoa, jota Livy-ohjelmointirajapinta-istunto käyttää näissä Spark-työpaikoissa. Jos haluat käyttää Fabric-ympäristöä, päivitä edellinen muistikirjan solu tällä yhden rivin muutoksella.
payload_data = {
"name":"livybatchdemo_with"+ newlakehouseName,
"file":"abfss://YourABFSPathToYourPayload.py",
"conf": {
"spark.targetLakehouse": "Fabric_LakehouseID",
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}" # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
}
}
Tarkastele töitäsi valvontakeskuksessa
Voit käyttää valvontakeskusta tarkastelemaan erilaisia Apache Spark -toimintoja valitsemalla Vasemmanpuoleisten siirtymislinkkien Valvonta.
Kun erätyö on suoritettu, voit tarkastella istunnon tilaa siirtymällä kohtaan Valvonta.
Valitse ja avaa viimeisimmän toiminnon nimi.
Tässä Livy-ohjelmointirajapinta-istunnon tapauksessa voit tarkastella edellisen erän lähettämistä, suoritustietoja, Spark-versioita ja määritystä. Huomaa pysäytetty tila oikeassa yläkulmassa.
Jotta voit tiivistää koko prosessin, tarvitset etäasiakkaan, kuten Visual Studio Coden, Microsoft Entra -sovellustunnuksen, Livy-ohjelmointirajapinnan päätepisteen URL-osoitteen, todennuksen Lakehousea vastaan, Spark-hyötykuorman Lakehousessa ja lopuksi erän Livy API -istunnon.