Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Область применения:✅ Инжиниринг данных и Обработка и анализ данных в Microsoft Fabric
Узнайте, как отправлять задания сеансов Spark с помощью API Livy для проектирования данных Fabric.
Это важно
Эта функция доступна в предварительной версии.
Предварительные условия
Емкость Fabric Premium или пробная версия с помощью Lakehouse
Удаленный клиент, например Visual Studio Code с Jupyter Notebook, PySpark и библиотекой проверки подлинности Майкрософт (MSAL) для Python
Либо токен приложения Microsoft Entra. Зарегистрируйте приложение на платформе идентификации Microsoft
Или токен SPN Microsoft Entra. Добавление учетных данных приложения и управление ими в Microsoft Entra
Некоторые данные в озере данных, в этом примере используется Комиссия по такси и лимузинам Нью-Йорка parquet файл green_tripdata_2022_08, загруженный в озеро данных.
API Livy определяет единую конечную точку для операций. Замените заполнители {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}, {Fabric_LakehouseID} и {Entra_ClientSecret} соответствующими значениями, когда вы следуете примерам в этой статье.
Настройка Visual Studio Code для сеанса API Livy
Выберите Параметры Lakehouse в вашем Fabric Lakehouse.
Перейдите к разделу конечной точки Livy.
Скопируйте строку подключения задания сеанса (первое красное поле на изображении) в ваш код.
Перейдите в Центр администрирования Microsoft Entra и скопируйте идентификатор приложения (клиента) и идентификатор каталога (клиента) в код.
Аутентифицировать сеанс Spark API Livy с помощью токена пользователя Entra или SPN токена Entra
Аутентифицируйте сеанс Spark API Livy с помощью токена SPN Entra.
Создайте записную книжку
.ipynb
в Visual Studio Code и вставьте следующий код.from msal import ConfidentialClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" client_secret = "Entra_ClientSecret" audience = "https://api.fabric.microsoft.com/.default" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Get the app-only token def get_app_only_token(tenant_id, client_id, client_secret, audience): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. Args: tenant_id (str): The Azure Active Directory tenant ID. client_id (str): The Service Principal's client ID. client_secret (str): The Service Principal's client secret. audience (str): The audience for the token (e.g., resource-specific scope). Returns: str: The access token. """ try: # Define the authority URL for the tenant authority = f"https://login.microsoftonline.com/{tenant_id}" # Create a ConfidentialClientApplication instance app = ConfidentialClientApplication( client_id = client_id, client_credential = client_secret, authority = authority ) # Acquire a token using the client credentials flow result = app.acquire_token_for_client(scopes = [audience]) # Check if the token was successfully retrieved if "access_token" in result: return result["access_token"] else: raise Exception("Failed to retrieve token: {result.get('error_description', 'Unknown error')}") except Exception as e: print(f"Error retrieving token: {e}", fil = sys.stderr) sys.exit(1) token = get_app_only_token(tenant_id, client_id, client_secret, audience) api_base_url = 'https://api.fabric.microsoft.com/v1/' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + token} print(token)
В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.
Аутентификация сеанса Spark в Livy API с использованием токена пользователя Entra
Создайте записную книжку
.ipynb
в Visual Studio Code и вставьте следующий код.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority = "https://login.microsoftonline.com/"Entra_TenantID" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes = ["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item. ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url ='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
В Visual Studio Code вы увидите возвращенный токен Microsoft Entra.
Создание сеанса Livy API Spark
Добавьте еще одну ячейку записной книжки и вставьте этот код.
create_livy_session = requests.post(livy_base_url, headers = headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json())
Запустите ячейку записной книжки, вы увидите одну строку, напечатанную при создании сеанса Livy.
Вы можете убедиться, что сеанс Livy создан, используя [просмотр ваших заданий в узле мониторинга](#View-your-jobs-in-the-Monitoring-hub).
Интеграция с средами Fabric
По умолчанию этот сеанс API Livy выполняется в стандартном начальном пуле рабочей области. Кроме того, вы можете использовать среды Microsoft Fabric создайте, настройте и используйте среду в Microsoft Fabric, чтобы настроить пул Spark, который используется сеансом API Livy для этих заданий Spark. Чтобы использовать среду Fabric, просто обновите предыдущую ячейку блокнота с JSON полезной нагрузкой.
create_livy_session = requests.post(livy_base_url, headers = headers, json = {
"conf" : {
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
}
)
Отправка инструкции spark.sql с помощью сеанса Spark API Livy
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.
Отправка второй инструкции spark.sql с помощью сеанса Spark API Livy
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers = headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers = headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers = headers, json = payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url + "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers = headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers = headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Запустите ячейку записной книжки, вы увидите несколько добавочных строк, напечатанных по мере отправки задания и возвращаемых результатов.
Закройте сеанс Livy с третьей инструкцией
Добавьте еще одну ячейку записной книжки и вставьте этот код.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, header = headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers = headers) print (response)
Просмотр заданий в центре мониторинга
Вы можете получить доступ к центру мониторинга для просмотра различных действий Apache Spark, выбрав монитор в левой части ссылок навигации.
Когда сеанс выполняется или находится в состоянии завершения, можно просмотреть состояние сеанса, перейдя к монитору.
Выберите и откройте название последнего действия.
В этой сессии API Livy можно просмотреть предыдущие отправки сеансов, подробности выполнения, версии Spark и конфигурацию. Обратите внимание на остановленное состояние в правом верхнем углу.
Чтобы подвести итоги всего процесса, вам потребуется удаленный клиент, например Visual Studio Code, токен приложения Microsoft Entra/SPN, URL-адрес конечной точки Livy API, аутентификация в вашем Lakehouse и, наконец, API Session Livy.