Gestire Azure Data Lake Analytics con Python

Importante

Azure Data Lake Analytics ritirato il 29 febbraio 2024. Altre informazioni con questo annuncio.

Per l'analisi dei dati, l'organizzazione può usare Azure Synapse Analytics o Microsoft Fabric.

Questo articolo descrive come gestire utenti, processi, origini dati e account Azure Data Lake Analytics tramite Python.

Versioni di Python supportate

  • Usare una versione a 64 bit di Python.
  • È possibile usare la distribuzione Python standard disponibile in Python.org download.
  • Molti sviluppatori hanno la comodità di usare la distribuzione python di Anaconda.
  • Questo articolo si basa su Python versione 3.6 della distribuzione di Python standard

Installare Azure Python SDK

Installare i moduli seguenti:

  • Il modulo azure-mgmt-resource include altri moduli di Azure per Active Directory e così via.
  • Il modulo azure-datalake-store include le operazioni di file system di Azure Data Lake Store.
  • Il modulo azure-mgmt-datalake-store include le operazioni di gestione degli account di Azure Data Lake Store.
  • Il modulo azure-mgmt-datalake-analytics include le operazioni di Azure Data Lake Analytics.

Assicurarsi prima di tutto di avere la versione più recente di pip usando il comando seguente:

python -m pip install --upgrade pip

Questo documento si basa su pip version 9.0.1.

Per installare i moduli dalla riga di comando, usare i comandi pip seguenti:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Creare un nuovo script Python

Incollare il codice seguente nello script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Eseguire questo script per verificare che i moduli possano essere importati.

Authentication

Autenticazione utente interattiva con un popup

Questo metodo non è supportato.

Autenticazione utente interattiva con un codice di dispositivo

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Autenticazione non interattiva con una SPI e un segreto

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Autenticazione non interattiva con un'API e un certificato

Questo metodo non è supportato.

Variabili dello script comuni

Negli esempi sono usate queste variabili.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Creare i client

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Creare un gruppo di risorse di Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Creare un account di Analisi Data Lake

Creare prima un account di archiviazione.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Creare quindi un account ADLA che usa tale archivio.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Inviare un processo

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Attendere la fine di un processo

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Elencare pipeline e ricorrenze

A seconda se i processi hanno pipeline o metadati associati, è possibile elencare le pipeline e le ricorrenze.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Gestire i criteri di calcolo

L'oggetto DataLakeAnalyticsAccountManagementClient offre metodi per gestire i criteri di calcolo per un account Data Lake Analytics.

Elencare i criteri di calcolo

Il codice seguente recupera un elenco di criteri di calcolo per un account Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Creare un nuovo criterio di calcolo

Il codice seguente crea un nuovo criterio di calcolo per un account Data Lake Analytics, impostando il massimo di unità di analisi disponibile per l'utente specificato su 50 e la priorità minima del processo su 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Passaggi successivi