Megosztás a következőn keresztül:


Az Azure Data Lake Analytics kezelése a Python használatával

Fontos

Az Azure Data Lake Analytics 2024. február 29-én megszűnt. További információ ezzel a bejelentéssel.

Az adatelemzéshez a szervezet használhatja a Azure Synapse Analyticset vagy a Microsoft Fabricet.

Ez a cikk azt ismerteti, hogyan kezelheti az Azure-Data Lake Analytics-fiókokat, -adatforrásokat, -felhasználókat és -feladatokat a Python használatával.

Támogatott Python-verziók

  • Használja a Python 64 bites verzióját.
  • A letöltések Python.org található standard Python-disztribúciót használhatja.
  • Sok fejlesztő kényelmesnek találja az Anaconda Python-disztribúció használatát.
  • Ez a cikk a Python standard Python-disztribúciójának 3.6-os verziójával készült

Az Azure Python SDK telepítése

Telepítse a következő modulokat:

  • Az azure-mgmt-resource modul más Azure-modulokat is tartalmaz az Active Directoryhoz stb.
  • Az azure-datalake-store modul tartalmazza az Azure Data Lake Store fájlrendszerműveleteket.
  • Az azure-mgmt-datalake-store modul tartalmazza az Azure Data Lake Store fiókkezelési műveleteit.
  • Az azure-mgmt-datalake-analytics modul tartalmazza az Azure Data Lake Analytics műveleteket.

Először győződjön meg arról, hogy a legújabb pip verzióval rendelkezik a következő parancs futtatásával:

python -m pip install --upgrade pip

Ezt a dokumentumot a használatával pip version 9.0.1írták.

A következő pip parancsokkal telepítheti a modulokat a parancssorból:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Új Python-szkript létrehozása

Illessze be a következő kódot a szkriptbe:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Futtassa ezt a szkriptet annak ellenőrzéséhez, hogy a modulok importálhatók-e.

Hitelesítés

Interaktív felhasználói hitelesítés előugró ablakkal

Ez a módszer nem támogatott.

Interaktív felhasználói hitelesítés eszközkóddal

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Neminteraktív hitelesítés SPI-vel és titkos kóddal

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Nem interaktív hitelesítés API-val és tanúsítvánnyal

Ez a módszer nem támogatott.

Gyakori szkriptváltozók

Ezeket a változókat a mintákban használjuk.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Az ügyfelek létrehozása

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Azure-erőforráscsoport létrehozása

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Data Lake Analytics-fiók létrehozása

Először hozzon létre egy áruházfiókot.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Ezután hozzon létre egy ADLA-fiókot, amely ezt a tárolót használja.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Feladat elküldése

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Várjon, amíg egy feladat befejeződik

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Folyamatok és ismétlődések listázása

Attól függően, hogy a feladatokhoz folyamat vagy ismétlődési metaadatok vannak csatolva, listázhatja a folyamatokat és az ismétlődéseket.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Számítási szabályzatok kezelése

A DataLakeAnalyticsAccountManagementClient objektum metódusokat biztosít egy Data Lake Analytics-fiók számítási szabályzatainak kezeléséhez.

Számítási szabályzatok listázása

Az alábbi kód lekéri egy Data Lake Analytics-fiók számítási szabályzatainak listáját.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Új számítási szabályzat létrehozása

Az alábbi kód egy új számítási szabályzatot hoz létre egy Data Lake Analytics-fiókhoz, a megadott felhasználó számára elérhető maximális AU-t 50-re, a minimális feladatprioritást pedig 250-re állítja.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Következő lépések