Поделиться через


Использование API Livy для отправки и выполнения заданий сеанса

Область применения:✅ Инжиниринг данных и Обработка и анализ данных в Microsoft Fabric

Узнайте, как отправлять задания сеансов Spark с помощью API Livy для проектирования данных Fabric.

Это важно

Эта функция доступна в предварительной версии.

Предварительные условия

API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID} и {Entra_ClientSecret} соответствующими значениями, когда вы следуете примерам в этой статье.

Настройка Visual Studio Code для сеанса API Livy

  1. Выберите Параметры Lakehouse в вашем Fabric Lakehouse.

    Снимок экрана: параметры Lakehouse.

  2. Перейдите к разделу конечной точки Livy.

    Снимок экрана с конечной точкой Lakehouse Livy и строкой подключения задания сеанса.

  3. Скопируйте строку подключения задания сеанса (первое красное поле на изображении) в ваш код.

  4. Перейдите в Центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.

    Снимок экрана: обзор приложения API Livy в Центре администрирования Microsoft Entra.

Аутентифицировать сеанс Spark API Livy с помощью токена пользователя Entra или SPN токена Entra

Аутентифицируйте сеанс Spark API Livy с помощью токена SPN Entra.

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    from msal import ConfidentialClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID" 
    client_id = "Entra_ClientID"
    client_secret = "Entra_ClientSecret"
    audience = "https://api.fabric.microsoft.com/.default"  
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Get the app-only token
    def get_app_only_token(tenant_id, client_id, client_secret, audience):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID.
            client_id (str): The Service Principal's client ID.
            client_secret (str): The Service Principal's client secret.
            audience (str): The audience for the token (e.g., resource-specific scope).
    
        Returns:
            str: The access token.
        """
        try:
            # Define the authority URL for the tenant
            authority = f"https://login.microsoftonline.com/{tenant_id}"
    
            # Create a ConfidentialClientApplication instance
            app = ConfidentialClientApplication(
                client_id = client_id,
                client_credential = client_secret,
                authority = authority
            )
    
            # Acquire a token using the client credentials flow
            result = app.acquire_token_for_client(scopes = [audience])
    
            # Check if the token was successfully retrieved
            if "access_token" in result:
                return result["access_token"]
            else:
                raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}")
        except Exception as e:
            print(f"Error retrieving token: {e}", fil = sys.stderr)
            sys.exit(1)
    
    token = get_app_only_token(tenant_id, client_id, client_secret, audience)
    
    api_base_url = 'https://api.fabric.microsoft.com/v1/'
    livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"    
    headers = {"Authorization": "Bearer " + token}
    
    print(token)
    
    
  2. В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.

    Снимок экрана, показывающий токен SPN Microsoft Entra, возвращенный после выполнения ячейки. ```

Аутентификация сеанса Spark в Livy API с использованием токена пользователя Entra

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
            authority = "https://login.microsoftonline.com/"Entra_TenantID"
        )
    
     result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
     result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
     "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
    print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url ='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.

    Снимок экрана, показывающий возвращенный токен пользователя Microsoft Entra после выполнения команды в ячейке.

Создание сеанса Livy API Spark

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    create_livy_session = requests.post(livy_base_url, headers = headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    print(get_session_response.json())
    
  2. Запустите ячейку записной книжки, вы увидите одну строку, напечатанную при создании сеанса Livy.

    Снимок экрана, показывающий результаты выполнения первой ячейки в записной книжке.

  3. Вы можете убедиться, что сеанс Livy создан, используя [просмотр ваших заданий в узле мониторинга](#View-your-jobs-in-the-Monitoring-hub).

Интеграция с средами Fabric

По умолчанию этот сеанс API Livy выполняется в стандартном начальном пуле рабочей области. Кроме того, вы можете использовать среды Microsoft Fabric создайте, настройте и используйте среду в Microsoft Fabric, чтобы настроить пул Spark, который используется сеансом API Livy для этих заданий Spark. Чтобы использовать среду Fabric, просто обновите предыдущую ячейку блокнота с JSON полезной нагрузкой.

create_livy_session = requests.post(livy_base_url, headers = headers, json = {
    "conf" : {
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
        }
)

Отправка инструкции spark.sql с помощью сеанса Spark API Livy

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
            "kind": "spark"
        }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.

    Снимок экрана, показывающий результаты выполнения первой ячейки записной книжки с помощью Spark.sql.

Отправка второй инструкции spark.sql с помощью сеанса Spark API Livy

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers = headers)
    
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers = headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    
    execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url + "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers = headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
    # Make the next request
    get_statement_response = requests.get(get_statement, headers = headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.

    Снимок экрана, показывающий результаты выполнения второй ячейки блокнота.

Закройте сеанс Livy с третьей инструкцией

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, header = headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers = headers)
    print (response)
    

Просмотр заданий в центре мониторинга

Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.

  1. Когда сеанс выполняется или находится в состоянии завершения, можно просмотреть состояние сеанса, перейдя к монитору.

    Снимок экрана: предыдущие отправки API Livy в центре мониторинга.

  2. Выберите и откройте название последнего действия.

    Снимок экрана: последнее действие API Livy в центре мониторинга.

  3. В этой сессии API Livy можно просмотреть предыдущие отправки сеансов, подробности выполнения, версии Spark и конфигурацию. Обратите внимание на остановленное состояние в правом верхнем углу.

    снимок экрана: последние сведения о действиях API Livy в центре мониторинга.

Чтобы подвести итоги всего процесса, вам потребуется удаленный клиент, например Visual Studio Code, токен приложения Microsoft Entra/SPN, URL-адрес конечной точки Livy API, аутентификация в вашем Lakehouse и, наконец, API Session Livy.