Dela via


Hantera Azure Data Lake Analytics med Python

Viktigt

Azure Data Lake Analytics drog sig tillbaka den 29 februari 2024. Läs mer med det här meddelandet.

För dataanalys kan din organisation använda Azure Synapse Analytics eller Microsoft Fabric.

Den här artikeln beskriver hur du hanterar Azure Data Lake Analytics-konton, datakällor, användare och jobb med hjälp av Python.

Python-versioner som stöds

  • Använd en 64-bitarsversion av Python.
  • Du kan använda python-standarddistributionen som finns i Python.org nedladdningar.
  • Många utvecklare tycker att det är praktiskt att använda Anaconda Python-distributionen.
  • Den här artikeln skrevs med Python version 3.6 från python-standarddistributionen

Installera Azure Python SDK

Installera följande moduler:

  • Modulen azure-mgmt-resource innehåller andra Azure-moduler för Active Directory osv.
  • Modulen azure-datalake-store innehåller filsystemsåtgärderna i Azure Data Lake Store.
  • Modulen azure-mgmt-datalake-store innehåller azure Data Lake Store-kontohanteringsåtgärder.
  • Modulen azure-mgmt-datalake-analytics innehåller Azure Data Lake Analytics åtgärder.

Kontrollera först att du har det senaste pip genom att köra följande kommando:

python -m pip install --upgrade pip

Det här dokumentet skrevs med .pip version 9.0.1

Använd följande pip kommandon för att installera modulerna från kommandoraden:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Skapa ett nytt Python-skript

Klistra in följande kod i skriptet:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Kör det här skriptet för att kontrollera att modulerna kan importeras.

Autentisering

Interaktiv användarautentisering med ett popup-fönster

Den här metoden stöds inte.

Interaktiv användarautentisering med en enhetskod

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Icke-interaktiv autentisering med SPI och en hemlighet

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Icke-interaktiv autentisering med API och ett certifikat

Den här metoden stöds inte.

Vanliga skriptvariabler

Dessa variabler används i exemplen.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Skapa klienterna

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Skapa en Azure-resursgrupp

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Skapa ett Data Lake Analytics-konto

Skapa först ett butikskonto.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Skapa sedan ett ADLA-konto som använder det arkivet.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Skicka ett jobb

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Vänta tills ett jobb har avslutats

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Lista pipelines och upprepningar

Beroende på om dina jobb har pipeline- eller upprepningsmetadata kopplade kan du lista pipelines och upprepningar.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Hantera beräkningsprinciper

Objektet DataLakeAnalyticsAccountManagementClient innehåller metoder för att hantera beräkningsprinciperna för ett Data Lake Analytics konto.

Lista beräkningsprinciper

Följande kod hämtar en lista över beräkningsprinciper för ett Data Lake Analytics konto.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Skapa en ny beräkningsprincip

Följande kod skapar en ny beräkningsprincip för ett Data Lake Analytics konto, som anger maximalt antal tillgängliga AUS för den angivna användaren till 50 och den minsta jobbprioriteten till 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Nästa steg