Бөлісу құралы:


Отправка и выполнение пакетных заданий Livy с помощью API Livy

Применимо к:✅ Проектирование данных и наука о данных в Fabric

Узнайте, как отправлять пакетные задания Spark с помощью API Livy для проектирования данных Fabric. В настоящее время API Livy не поддерживает служебный принципал Azure (SPN).

Предварительные требования

API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} и {Fabric_LakehouseID} соответствующими значениями при выполнении примеров в этой статье.

Настройка Visual Studio Code для пакетной службы API Livy

  1. Выберите Параметры Lakehouse в вашем Fabric Lakehouse.

    Снимок экрана: параметры Lakehouse.

  2. Перейдите к разделу конечной точки Livy.

    screenshot с конечной точкой Lakehouse Livy и строкой подключения сеанса.

  3. Скопируйте строку подключения задачи пакетной обработки (второе красное поле на изображении) в ваш код.

  4. Перейдите в центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.

    Скриншот с обзором приложения API Livy в административном центре Microsoft Entra.

Создайте пакетный код Spark и загрузите его в Lakehouse

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Сохраните файл Python локально. Этот код на Python содержит два оператора Spark, которые работают с данными в Lakehouse и необходимо загрузить в ваш Lakehouse. Вам нужен путь ABFS полезных данных для ссылки на пакетное задание API Livy в Visual Studio Code и имя таблицы Lakehouse в инструкции Select SQL.

    Снимок экрана, который показывает ячейку с кодом загрузки на Python.

  3. Загрузите пейлоад Python в раздел "Файлы" вашего Lakehouse. В обозревателе Lakehouse выберите "Файлы". Затем выберите >Получить данные>Загрузить файлы. Выберите файлы с помощью средства выбора файлов.

    Снимок экрана, показывающий нагрузку в разделе

  4. После того как файл окажется в разделе "Файлы" в Lakehouse, нажмите на три точки справа от имени файла и выберите "Свойства".

    Снимок экрана, показывающий путь ABFS данных в свойствах файла в Lakehouse.

  5. Скопируйте этот путь ABFS в ячейку записной книжки на шаге 1.

Аутентификация пакетного сеанса API Spark Livy с помощью токена пользователя Microsoft Entra или токена пользователя-службы Microsoft Entra.

Аутентификация пакетного сеанса API Spark Livy с помощью токена SPN Microsoft Entra

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. Запустите ячейку записной книжки, вы увидите возвращенный маркер Microsoft Entra.

    Скриншот с маркером Microsoft Entra SPN, возвращенным после выполнения ячейки.

Аутентификация сеанса API Livy для Spark с использованием токена пользователя Microsoft Entra

  1. Создайте записную книжку .ipynb в Visual Studio Code и вставьте следующий код.

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Microsoft Fabric API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",        # Read lakehouse metadata
        "https://api.fabric.microsoft.com/Item.ReadWrite.All",        # Read/write fabric items
        "https://api.fabric.microsoft.com/Workspace.ReadWrite.All",   # Access workspace operations
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",    # Access storage from code
        "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All",     # Access Azure Key Vault
        "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", # Access Azure Data Explorer
        "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All",     # Access Azure Data Lake
        "https://api.fabric.microsoft.com/Code.AccessFabric.All"             # General Fabric access
    ]
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. Запустите ячейку записной книжки, всплывающее окно должно появиться в браузере, позволяющее выбрать удостоверение для входа.

    Скриншот с экраном входа в систему приложения Microsoft Entra.

  3. После выбора удостоверения для входа необходимо утвердить разрешения API регистрации приложений Microsoft Entra.

    Скриншот с разрешениями API приложения Microsoft Entra.

  4. Закройте окно браузера после завершения проверки подлинности.

    Снимок экрана: проверка подлинности завершена.

  5. В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.

    Скриншот, показывающий токен Microsoft Entra, возвращенный после выполнения ячейки и входа в систему.

Отправьте Livy пакет и отслеживайте запланированное задание.

  1. Добавьте еще одну ячейку записной книжки и вставьте этот код.

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. Запустите ячейку ноутбука, и вы увидите несколько строк, выводимых на экран во время создания и выполнения задания пакета Livy.

    Скриншот с результатами в Visual Studio Code после успешной отправки пакетного задания Livy.

  3. Чтобы увидеть изменения, вернитесь в вашу Lakehouse.

Интеграция с средами Fabric

По умолчанию этот сеанс API Livy выполняется в стандартном начальном пуле рабочей области. Кроме того, можно использовать среды Fabric Создать, настроить и использовать среду в Microsoft Fabric для кастомизации пула Spark, используемого сеансом API Livy для этих заданий Spark. Чтобы использовать среду Fabric, обновите предыдущие ячейки записной книжки с помощью этого изменения одной строки.

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

Просмотр заданий в центре мониторинга

Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.

  1. После завершения пакетного задания можно просмотреть состояние сеанса, перейдя к монитору.

    Снимок экрана: предыдущие отправки API Livy в центре мониторинга.

  2. Выберите и откройте самое последнее название действия.

    Снимок экрана: последнее действие API Livy в центре мониторинга.

  3. В этом случае сеанса API Livy можно просмотреть предыдущую пакетную отправку, сведения о выполнении, версии Spark и конфигурацию. Обратите внимание на остановленное состояние в правом верхнем углу.

    снимок экрана: последние сведения о действиях API Livy в центре мониторинга.

Чтобы резюмировать весь процесс, вам потребуется удаленный клиент, например Visual Studio Code, токен приложения Microsoft Entra, URL-адрес конечной точки API Livy, аутентификация с вашим Lakehouse, данные Spark в вашем Lakehouse, и, наконец, сеанс пакетного режима API Livy.