Поделиться через


Управление Azure Data Lake Analytics с помощью Python

Важно!

Поддержка Azure Data Lake Analytics прекращена 29 февраля 2024 г. Дополнительные сведения см. в этом объявлении.

Для аналитики данных ваша организация может использовать Azure Synapse Analytics или Microsoft Fabric.

В этой статье описано, как управлять учетными записями, источниками данных, пользователями и заданиями Azure Data Lake Analytics с помощью Python.

Поддерживаемые версии Python

Установка пакета Azure SDK для Python

Установите следующие модули:

  • Модуль azure-mgmt-resource содержит другие модули Azure для Active Directory и др.
  • Модуль azure-datalake-store содержит операции файловой системы Azure Data Lake Store.
  • Модуль azure-mgmt-datalake-store содержит операции по управлению учетной записью Azure Data Lake Store.
  • Модуль azure-mgmt-datalake-analytics содержит операции Azure Data Lake Analytics.

Во-первых, убедитесь, что установлена актуальная версия компонента pip, выполнив следующую команду:

python -m pip install --upgrade pip

При написании этого документа использовался компонент pip version 9.0.1.

Чтобы установить модули, используйте следующие команды pip в командной строке.

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Создание сценария Python

Скопируйте приведенный ниже код и вставьте его в сценарий.

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Запустите этот сценарий, чтобы проверить, можно ли импортировать модули.

Аутентификация

Интерактивная аутентификация пользователей с помощью всплывающего окна

Этот метод не поддерживается.

Интерактивная аутентификация пользователей с помощью кода устройства

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Неинтерактивная аутентификация с помощью SPI и секрета

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Неинтерактивная аутентификация с помощью API и секрета

Этот метод не поддерживается.

Общие переменные сценария

Эти переменные используются в примерах.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Создание клиентов

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Создание группы ресурсов Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Создание учетной записи аналитики озера данных

Сначала создайте учетную запись хранилища.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Затем создайте учетную запись ADLA, которая использует это хранилище.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Отправка задания

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Ожидание завершения задания

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Вывод списка конвейеров и повторений

В зависимости от того, вложены ли в задания метаданные конвейеров или повторений, можно отобразить список конвейеров и повторений.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Управление политиками вычислений

Объект DataLakeAnalyticsAccountManagementClient предоставляет методы для управления политиками вычислений для учетной записи Data Lake Analytics.

Вывод списка политик вычислений

Следующий код извлекает список политик вычислений для учетной записи Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Создание новой политики вычислений

Следующий код создает новую политику вычислений для учетной записи Data Lake Analytics, задавая максимальное количество AU, доступных для указанного пользователя, равным 50 и минимальный приоритет задания равным 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Дальнейшие действия