Gerir Azure Data Lake Analytics usando Python

Este artigo descreve como gerir Azure Data Lake Analytics contas, fontes de dados, utilizadores e empregos utilizando Python.

Versões Python suportadas

  • Use uma versão de 64 bits de Python.
  • Você pode usar a distribuição padrão python encontrada em Python.org downloads.
  • Muitos desenvolvedores acham conveniente usar a distribuição de Anaconda Python.
  • Este artigo foi escrito usando a versão Python 3.6 da distribuição padrão python

Instalar o SDK de Python para o Azure

Instalar os seguintes módulos:

  • O módulo azure-mgmt-recurso inclui outros módulos Azure para Ative Directory, etc.
  • O módulo azure-datalake-store inclui as operações do sistema de ficheiros Azure Data Lake Store.
  • O módulo azure-mgmt-datalake-store inclui as operações de gestão de conta Azure Data Lake Store.
  • O módulo azure-mgmt-datalake-analytics inclui as operações Azure Data Lake Analytics.

Em primeiro lugar, certifique-se de que tem o mais recente pip através da execução do seguinte comando:

python -m pip install --upgrade pip

Este documento foi escrito utilizando pip version 9.0.1.

Utilize os seguintes pip comandos para instalar os módulos a partir da linha de comando:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Criar um novo script Python

Cole o seguinte código no script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Execute este script para verificar se os módulos podem ser importados.

Autenticação

Autenticação interativa do utilizador com um pop-up

Este método não é apoiado.

Autenticação interativa do utilizador com um código de dispositivo

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Autenticação não interativa com SPI e um segredo

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Autenticação não interativa com API e certificado

Este método não é apoiado.

Variáveis comuns de script

Estas variáveis são usadas nas amostras.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Criar os clientes

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Criar um Grupo de Recursos do Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Criar conta de Data Lake Analytics

Primeiro criar uma conta na loja.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Em seguida, crie uma conta ADLA que utilize essa loja.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Submeter um emprego

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Esperar que um trabalho acabe.

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Listar gasodutos e recorrências

Dependendo se os seus trabalhos têm metadados de pipeline ou recorrência anexados, pode listar os oleodutos e as recorrências.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Gerir políticas de cálculo

O objeto DataLakeAnalyticsAccountManagementClient fornece métodos para gerir as políticas de cálculo para uma conta Data Lake Analytics.

Listar políticas de cálculo

O código seguinte recupera uma lista de políticas de computação para uma conta Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Criar uma nova política de computação

O código que se segue cria uma nova política de computação para uma conta Data Lake Analytics, definindo o máximo de IA disponível para o utilizador especificado para 50, e a prioridade mínima de trabalho para 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Passos seguintes