Bagikan melalui


Gunakan Livy API untuk mengirimkan dan menjalankan tugas sesi

Berlaku untuk:✅ Rekayasa Data dan Ilmu Data di Microsoft Fabric

Pelajari cara mengirimkan pekerjaan sesi Spark menggunakan Livy API untuk Fabric Data Engineering.

Penting

Fitur ini sedang dalam tahap pratinjau.

Prasyarat

Livy API mendefinisikan titik akhir terpadu untuk operasi. Ganti tempat penampung {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID}, dan {Entra_ClientSecret} dengan nilai yang sesuai saat Anda mengikuti contoh dalam artikel ini.

Mengonfigurasi Visual Studio Code untuk Sesi API Livy Anda

  1. Pilih Pengaturan Lakehouse di Fabric Lakehouse Anda.

    Cuplikan layar memperlihatkan pengaturan Lakehouse.

  2. Navigasikan ke bagian titik akhir Livy.

    cuplikan layar memperlihatkan titik akhir Lakehouse Livy dan string koneksi pekerjaan Sesi.

  3. Salin string koneksi pekerjaan Sesi (kotak merah pertama dalam gambar) ke dalam kode Anda.

  4. Navigasi ke pusat admin Microsoft Entra dan salin ID Aplikasi (klien) dan ID Direktori (penyewa) ke kode Anda.

    Cuplikan layar memperlihatkan gambaran umum aplikasi Livy API di pusat admin Microsoft Entra.

Mengautentikasi sesi Livy API Spark menggunakan token pengguna Entra atau token Entra SPN

Mengautentikasi sesi Livy API Spark menggunakan token Entra SPN

  1. Buat .ipynb buku catatan di Visual Studio Code dan sisipkan kode berikut.

    from msal import ConfidentialClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID" 
    client_id = "Entra_ClientID"
    client_secret = "Entra_ClientSecret"
    audience = "https://api.fabric.microsoft.com/.default"  
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Get the app-only token
    def get_app_only_token(tenant_id, client_id, client_secret, audience):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID.
            client_id (str): The Service Principal's client ID.
            client_secret (str): The Service Principal's client secret.
            audience (str): The audience for the token (e.g., resource-specific scope).
    
        Returns:
            str: The access token.
        """
        try:
            # Define the authority URL for the tenant
            authority = f"https://login.microsoftonline.com/{tenant_id}"
    
            # Create a ConfidentialClientApplication instance
            app = ConfidentialClientApplication(
                client_id = client_id,
                client_credential = client_secret,
                authority = authority
            )
    
            # Acquire a token using the client credentials flow
            result = app.acquire_token_for_client(scopes = [audience])
    
            # Check if the token was successfully retrieved
            if "access_token" in result:
                return result["access_token"]
            else:
                raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}")
        except Exception as e:
            print(f"Error retrieving token: {e}", fil = sys.stderr)
            sys.exit(1)
    
    token = get_app_only_token(tenant_id, client_id, client_secret, audience)
    
    api_base_url = 'https://api.fabric.microsoft.com/v1/'
    livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"    
    headers = {"Authorization": "Bearer " + token}
    
    print(token)
    
    
  2. Di Visual Studio Code, Anda akan melihat token Microsoft Entra dikembalikan.

    Cuplikan layar memperlihatkan token Microsoft Entra SPN dikembalikan setelah menjalankan sel. ```

Mengautentikasi sesi Livy API Spark menggunakan token pengguna Entra

  1. Buat .ipynb buku catatan di Visual Studio Code dan sisipkan kode berikut.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
            authority = "https://login.microsoftonline.com/"Entra_TenantID"
        )
    
     result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
     result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
     "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
    print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url ='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Di Visual Studio Code, Anda akan melihat token Microsoft Entra dikembalikan.

    Cuplikan layar memperlihatkan token pengguna Microsoft Entra yang dikembalikan setelah menjalankan sel.

Membuat sesi Livy API Spark

  1. Tambahkan sel buku catatan lain dan sisipkan kode ini.

    create_livy_session = requests.post(livy_base_url, headers = headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    print(get_session_response.json())
    
  2. Jalankan sel buku catatan, Anda akan melihat satu baris dicetak saat sesi Livy dibuat.

    Cuplikan layar memperlihatkan hasil eksekusi sel notebook pertama.

  3. Anda dapat memverifikasi bahwa sesi Livy dibuat dengan menggunakan [Lihat pekerjaan Anda di hub Pemantauan](#View pekerjaan Anda di hub Pemantauan).

Integrasi dengan Fabric Environments

Secara bawaan, sesi Livy API ini berjalan dengan kumpulan starter bawaan untuk ruang kerja. Atau Anda dapat menggunakan Fabric Environments Membuat, mengonfigurasi, dan menggunakan lingkungan di Microsoft Fabric untuk menyesuaikan kumpulan Spark yang digunakan sesi Livy API untuk pekerjaan Spark ini. Untuk menggunakan Fabric Environment, cukup perbarui sel notebook sebelumnya dengan payload JSON ini.

create_livy_session = requests.post(livy_base_url, headers = headers, json = {
    "conf" : {
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
        }
)

Mengirimkan pernyataan spark.sql menggunakan sesi Livy API Spark

  1. Tambahkan sel buku catatan lain dan sisipkan kode ini.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
            "kind": "spark"
        }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Jalankan sel buku catatan, Anda akan melihat beberapa baris inkremental yang dicetak saat pekerjaan dikirimkan dan hasilnya dikembalikan.

    Cuplikan layar memperlihatkan hasil sel buku catatan pertama dengan eksekusi Spark.sql.

Mengirimkan pernyataan spark.sql kedua menggunakan sesi Livy API Spark

  1. Tambahkan sel buku catatan lain dan sisipkan kode ini.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Jalankan sel buku catatan, Anda akan melihat beberapa baris inkremental yang dicetak saat pekerjaan dikirimkan dan hasilnya dikembalikan.

    Cuplikan layar memperlihatkan hasil eksekusi sel buku catatan kedua.

Tutup sesi Livy dengan pernyataan ketiga

  1. Tambahkan sel buku catatan lain dan sisipkan kode ini.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, header = headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers = headers)
    print (response)
    

Menampilkan pekerjaan Anda di hub Pemantauan

Anda dapat mengakses hub Pemantauan untuk melihat berbagai aktivitas Apache Spark dengan memilih Pantau di tautan navigasi sisi kiri.

  1. Saat sesi sedang berlangsung atau dalam status selesai, Anda dapat melihat status sesi dengan menavigasi ke Monitor.

    Cuplikan layar memperlihatkan pengiriman LIVY API sebelumnya di hub Pemantauan.

  2. Pilih dan buka nama aktivitas terbaru.

    Cuplikan layar memperlihatkan aktivitas Livy API terbaru di hub Pemantauan.

  3. Dalam kasus sesi Livy API ini, Anda dapat melihat pengiriman sesi sebelumnya, detail eksekusi, versi Spark, dan konfigurasi. Perhatikan status berhenti di kanan atas.

    Cuplikan layar memperlihatkan detail aktivitas Livy API terbaru di hub Pemantauan.

Untuk merangkum seluruh proses, Anda memerlukan klien remote seperti Visual Studio Code, token aplikasi Microsoft Entra/SPN, URL titik akhir Livy API, autentikasi terhadap Lakehouse Anda, dan akhirnya Session Livy API.