Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Berlaku untuk:✅ Rekayasa Data dan Ilmu Data di Microsoft Fabric
Pelajari cara mengirimkan pekerjaan sesi Spark menggunakan Livy API untuk Fabric Data Engineering.
Penting
Fitur ini sedang dalam tahap pratinjau.
Prasyarat
Klien jarak jauh seperti Visual Studio Code dengan Jupyter Notebooks, PySpark, dan Microsoft Authentication Library (MSAL) untuk Python
Salah satu token aplikasi Microsoft Entra. Mendaftarkan aplikasi dengan platform identitas Microsoft
Atau token Microsoft Entra SPN. Menambahkan dan mengelola kredensial aplikasi di Microsoft Entra
Beberapa data di lakehouse Anda, pada contoh ini menggunakan
Komisi Taksi & Limousine NYC green_tripdata_2022_08 sebuah file parquet yang dimuat ke lakehouse
Livy API mendefinisikan titik akhir terpadu untuk operasi. Ganti tempat penampung {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID}, dan {Entra_ClientSecret} dengan nilai yang sesuai saat Anda mengikuti contoh dalam artikel ini.
Mengonfigurasi Visual Studio Code untuk Sesi API Livy Anda
Pilih Pengaturan Lakehouse di Fabric Lakehouse Anda.
Navigasikan ke bagian titik akhir Livy.
Salin string koneksi pekerjaan Sesi (kotak merah pertama dalam gambar) ke dalam kode Anda.
Navigasi ke pusat admin Microsoft Entra dan salin ID Aplikasi (klien) dan ID Direktori (penyewa) ke kode Anda.
Mengautentikasi sesi Livy API Spark menggunakan token pengguna Entra atau token Entra SPN
Mengautentikasi sesi Livy API Spark menggunakan token Entra SPN
Buat
.ipynb
buku catatan di Visual Studio Code dan sisipkan kode berikut.from msal import ConfidentialClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" client_secret = "Entra_ClientSecret" audience = "https://api.fabric.microsoft.com/.default" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Get the app-only token def get_app_only_token(tenant_id, client_id, client_secret, audience): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. Args: tenant_id (str): The Azure Active Directory tenant ID. client_id (str): The Service Principal's client ID. client_secret (str): The Service Principal's client secret. audience (str): The audience for the token (e.g., resource-specific scope). Returns: str: The access token. """ try: # Define the authority URL for the tenant authority = f"https://login.microsoftonline.com/{tenant_id}" # Create a ConfidentialClientApplication instance app = ConfidentialClientApplication( client_id = client_id, client_credential = client_secret, authority = authority ) # Acquire a token using the client credentials flow result = app.acquire_token_for_client(scopes = [audience]) # Check if the token was successfully retrieved if "access_token" in result: return result["access_token"] else: raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}") except Exception as e: print(f"Error retrieving token: {e}", fil = sys.stderr) sys.exit(1) token = get_app_only_token(tenant_id, client_id, client_secret, audience) api_base_url = 'https://api.fabric.microsoft.com/v1/' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + token} print(token)
Di Visual Studio Code, Anda akan melihat token Microsoft Entra dikembalikan.
Mengautentikasi sesi Livy API Spark menggunakan token pengguna Entra
Buat
.ipynb
buku catatan di Visual Studio Code dan sisipkan kode berikut.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority = "https://login.microsoftonline.com/"Entra_TenantID" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url ='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Di Visual Studio Code, Anda akan melihat token Microsoft Entra dikembalikan.
Membuat sesi Livy API Spark
Tambahkan sel buku catatan lain dan sisipkan kode ini.
create_livy_session = requests.post(livy_base_url, headers = headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json())
Jalankan sel buku catatan, Anda akan melihat satu baris dicetak saat sesi Livy dibuat.
Anda dapat memverifikasi bahwa sesi Livy dibuat dengan menggunakan [Lihat pekerjaan Anda di hub Pemantauan](#View pekerjaan Anda di hub Pemantauan).
Integrasi dengan Fabric Environments
Secara bawaan, sesi Livy API ini berjalan dengan kumpulan starter bawaan untuk ruang kerja. Atau Anda dapat menggunakan Fabric Environments Membuat, mengonfigurasi, dan menggunakan lingkungan di Microsoft Fabric untuk menyesuaikan kumpulan Spark yang digunakan sesi Livy API untuk pekerjaan Spark ini. Untuk menggunakan Fabric Environment, cukup perbarui sel notebook sebelumnya dengan payload JSON ini.
create_livy_session = requests.post(livy_base_url, headers = headers, json = {
"conf" : {
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
}
)
Mengirimkan pernyataan spark.sql menggunakan sesi Livy API Spark
Tambahkan sel buku catatan lain dan sisipkan kode ini.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Jalankan sel buku catatan, Anda akan melihat beberapa baris inkremental yang dicetak saat pekerjaan dikirimkan dan hasilnya dikembalikan.
Mengirimkan pernyataan spark.sql kedua menggunakan sesi Livy API Spark
Tambahkan sel buku catatan lain dan sisipkan kode ini.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Jalankan sel buku catatan, Anda akan melihat beberapa baris inkremental yang dicetak saat pekerjaan dikirimkan dan hasilnya dikembalikan.
Tutup sesi Livy dengan pernyataan ketiga
Tambahkan sel buku catatan lain dan sisipkan kode ini.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, header = headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers = headers) print (response)
Menampilkan pekerjaan Anda di hub Pemantauan
Anda dapat mengakses hub Pemantauan untuk melihat berbagai aktivitas Apache Spark dengan memilih Pantau di tautan navigasi sisi kiri.
Saat sesi sedang berlangsung atau dalam status selesai, Anda dapat melihat status sesi dengan menavigasi ke Monitor.
Pilih dan buka nama aktivitas terbaru.
Dalam kasus sesi Livy API ini, Anda dapat melihat pengiriman sesi sebelumnya, detail eksekusi, versi Spark, dan konfigurasi. Perhatikan status berhenti di kanan atas.
Untuk merangkum seluruh proses, Anda memerlukan klien remote seperti Visual Studio Code, token aplikasi Microsoft Entra/SPN, URL titik akhir Livy API, autentikasi terhadap Lakehouse Anda, dan akhirnya Session Livy API.