Verwalten von Azure Data Lake Analytics mithilfe von Python

In diesem Artikel wird beschrieben, wie Sie Azure Data Lake Analytics-Konten, -Datenquellen, -Benutzer und -Aufträge mit Python verwalten.

Unterstützte Python-Versionen

  • Verwenden Sie eine 64-Bit-Version von Python.
  • Sie können die Python-Standarddistribution unter Python.org downloads verwenden.
  • Viele Entwickler bevorzugen die Verwendung der Python-Distribution Anaconda.
  • Für diesen Artikel wurde die Python-Version 3.6 der Python-Standarddistribution verwendet.

Installieren des Azure Python SDK

Installieren Sie die folgenden Module:

  • Das Modul azure-mgmt-resource enthält andere Azure-Module, z.B. für Active Directory.
  • Das Modul azure-datalake-store umfasst die Vorgänge für das Azure Data Lake Store-Dateisystem.
  • Das Modul azure-mgmt-datalake-store beinhaltet Kontoverwaltungsvorgänge für Azure Data Lake Store.
  • Das Modul azure-mgmt-datalake-analytics beinhaltet die Vorgänge für Azure Data Lake Analytics.

Vergewissern Sie sich zunächst, dass die aktuelle Version von pip verwendet wird, indem Sie folgenden Befehl ausführen:

python -m pip install --upgrade pip

Für dieses Dokument wurde pip version 9.0.1 verwendet.

Verwenden Sie die folgenden pip-Befehle, um die Module über die Befehlszeile zu installieren:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Erstellen eines Python-Skripts

Fügen Sie den folgenden Code in das Skript ein:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Führen Sie dieses Skript aus, um zu überprüfen, ob die Module importiert werden können.

Authentifizierung

Interaktive Benutzerauthentifizierung mit einem Popup

Diese Methode wird nicht unterstützt.

Interaktive Benutzerauthentifizierung mit einem Gerätecode

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Nicht interaktive Authentifizierung mit SPI und einem Geheimnis

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Nicht interaktive Authentifizierung mit API und einem Zertifikat

Diese Methode wird nicht unterstützt.

Allgemeine Skriptvariablen

Diese Variablen werden in den Beispielen verwendet.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Erstellen der Clients

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Erstellen einer Azure-Ressourcengruppe

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Erstellen eines Data Lake Analytics-Kontos

Erstellen Sie zunächst ein Speicherkonto.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Erstellen Sie dann ein ADLA-Konto, das diesen Speicher verwendet.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Übermitteln eines Auftrags

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Warten auf den Abschluss eines Auftrags

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Auflisten von Pipelines und Wiederholungen

Abhängig davon, ob an Ihre Aufträge Pipeline- oder Wiederholungsmetadaten angefügt sind, können Sie Pipelines und Wiederholungen auflisten.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Verwalten von Computerichtlinien

Das DataLakeAnalyticsAccountManagementClient-Objekt enthält Methoden zum Verwalten der Computerichtlinien für ein Data Lake Analytics-Konto.

Auflisten von Computerichtlinien

Mit dem folgenden Code wird eine Liste von Computerichtlinien für ein Data Lake Analytics-Konto abgerufen.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Erstellen einer Computerichtlinie

Mit dem folgenden Code wird eine neue Computerichtlinie für ein Data Lake Analytics-Konto erstellt, mit der die maximal verfügbaren Analytics-Einheiten für den angegebenen Benutzer auf 50 und die minimale Auftragspriorität auf 250 festgelegt wird.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Nächste Schritte