Leggere in inglese

Condividi tramite


Gestire Azure Data Lake Analytics con Python

Importante

Azure Data Lake Analytics è stato ritirato il 29 febbraio 2024. Per altre informazioni, vedere questo annuncio.

Per l'analisi dei dati, l'organizzazione può usare Azure Synapse Analytics o Microsoft Fabric.

Questo articolo descrive come gestire account, origini dati, utenti e processi di Azure Data Lake Analytics usando Python.

Versioni di Python supportate

  • Usare una versione a 64 bit di Python.
  • È possibile usare la distribuzione Standard di Python disponibile in Python.org download.
  • Molti sviluppatori trovano utile usare la distribuzione di Anaconda Python.
  • Questo articolo è stato scritto usando Python versione 3.6 dalla distribuzione Standard di Python

Installare Azure Python SDK

Installare i moduli seguenti:

  • Il modulo azure-mgmt-resource include altri moduli di Azure per Active Directory e così via.
  • Il modulo azure-datalake-store include le operazioni del file system di Azure Data Lake Store.
  • Il modulo azure-mgmt-datalake-store include le operazioni di gestione degli account di Azure Data Lake Store.
  • Il modulo azure-mgmt-datalake-analytics include le operazioni di Azure Data Lake Analytics.

Prima di tutto, assicurarsi di avere la versione più recente pip eseguendo il comando seguente:

python -m pip install --upgrade pip

Questo documento è stato scritto utilizzando pip version 9.0.1.

Usare i comandi seguenti pip per installare i moduli dalla riga di comando:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Creare un nuovo script Python

Incollare il codice seguente nello script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Eseguire questo script per verificare che i moduli possano essere importati.

Autenticazione

Autenticazione utente interattiva con un popup

Questo metodo non è supportato.

Autenticazione utente interattiva con un codice del dispositivo

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Autenticazione non interattiva con SPI e un segreto

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Autenticazione non interattiva con l'API e un certificato

Questo metodo non è supportato.

Variabili di script comuni

Queste variabili vengono usate negli esempi.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Creare i clienti

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Creare un gruppo di risorse di Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Creare un account Data Lake Analytics

Creare prima di tutto un account dello Store.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Crea quindi un account ADLA che utilizza quell'archivio.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Invia un lavoro

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Attendere la fine di un compito

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Elencare pipeline e ricorrenze

A seconda che i carichi di lavoro abbiano metadati di pipeline o ricorrenza collegati, puoi elencare pipeline e ricorrenze.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Gestire i criteri di calcolo

L'oggetto DataLakeAnalyticsAccountManagementClient fornisce metodi per la gestione dei criteri di calcolo per un account Data Lake Analytics.

Elencare i criteri di calcolo

Il codice seguente recupera un elenco di criteri di calcolo per un account Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Creare un nuovo criterio di calcolo

Il codice seguente crea un nuovo criterio di elaborazione per un account Data Lake Analytics, impostando il numero massimo di unità di elaborazione disponibili per l'utente specificato su 50 e la priorità minima del processo su 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Passaggi successivi