Delen via


Azure Data Lake Analytics beheren met Python

Belangrijk

Azure Data Lake Analytics op 29 februari 2024 buiten gebruik gesteld. Meer informatie over deze aankondiging.

Voor gegevensanalyse kan uw organisatie gebruikmaken van Azure Synapse Analytics of Microsoft Fabric.

In dit artikel wordt beschreven hoe u Azure Data Lake Analytics-accounts, gegevensbronnen, gebruikers en taken beheert met behulp van Python.

Ondersteunde Python-versies

  • Gebruik een 64-bits versie van Python.
  • U kunt de standaarddistributie van Python gebruiken op Python.org downloads.
  • Veel ontwikkelaars vinden het handig om de Anaconda Python-distributie te gebruiken.
  • Dit artikel is geschreven met Python versie 3.6 van de standaard Python-distributie

Azure Python-SDK installeren

Installeer de volgende modules:

  • De module azure-mgmt-resource bevat andere Azure-modules voor Active Directory, enzovoort.
  • De module azure-datalake-store bevat de bestandssysteembewerkingen van Azure Data Lake Store.
  • De module azure-mgmt-datalake-store bevat de azure Data Lake Store-accountbeheerbewerkingen.
  • De module azure-mgmt-datalake-analytics bevat de Azure Data Lake Analytics-bewerkingen.

Zorg er eerst voor dat u over de meest recente pip beschikt door de volgende opdracht uit te voeren:

python -m pip install --upgrade pip

Dit document is geschreven met .pip version 9.0.1

Gebruik de volgende pip opdrachten om de modules te installeren vanaf de opdrachtregel:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Een nieuw Python-script maken

Plak de volgende code in het script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Voer dit script uit om te controleren of de modules kunnen worden geïmporteerd.

Verificatie

Interactieve gebruikersverificatie met een pop-up

Deze methode wordt niet ondersteund.

Interactieve gebruikersverificatie met een apparaatcode

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Niet-interactieve verificatie met SPI en een geheim

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Niet-interactieve verificatie met API en een certificaat

Deze methode wordt niet ondersteund.

Algemene scriptvariabelen

Deze variabelen worden gebruikt in de voorbeelden.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

De clients maken

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Een Azure-resourcegroep maken

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Een Data Lake Analytics-account maken

Maak eerst een winkelaccount.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Maak vervolgens een ADLA-account dat gebruikmaakt van dat archief.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Een taak indienen

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Wachten tot een taak is beëindigd

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Pijplijnen en terugkeerpatronen weergeven

Afhankelijk van of aan uw taken pijplijn- of terugkeergegevens zijn gekoppeld, kunt u pijplijnen en terugkeerpatronen weergeven.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Rekenbeleid beheren

Het object DataLakeAnalyticsAccountManagementClient biedt methoden voor het beheren van het rekenbeleid voor een Data Lake Analytics-account.

Rekenbeleid weergeven

Met de volgende code wordt een lijst met rekenbeleidsregels voor een Data Lake Analytics-account opgehaald.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Een nieuw rekenbeleid maken

Met de volgende code maakt u een nieuw rekenbeleid voor een Data Lake Analytics-account, waarbij het maximum aantal BESCHIKBARE EENHEDEN voor de opgegeven gebruiker wordt ingesteld op 50 en de minimale taakprioriteit op 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Volgende stappen