Share via


Python kullanarak Azure Data Lake Analytics yönetme

Önemli

Azure Data Lake Analytics 29 Şubat 2024'te kullanımdan kaldırıldı. Bu duyuru ile daha fazla bilgi edinin.

Veri analizi için kuruluşunuz Azure Synapse Analytics veya Microsoft Fabric kullanabilir.

Bu makalede Azure Data Lake Analytics hesaplarının, veri kaynaklarının, kullanıcıların ve işlerin Python kullanılarak nasıl yönetileceğini açıklanmaktadır.

Desteklenen Python sürümleri

  • Python'ın 64 bit sürümünü kullanın.
  • Python.org indirmelerinde bulunan standart Python dağıtımını kullanabilirsiniz.
  • Birçok geliştirici Anaconda Python dağıtımını kullanmayı kolaylaştırır.
  • Bu makale, standart Python dağıtımından Python sürüm 3.6 kullanılarak yazılmıştır

Azure Python SDK’yı yükleme

Aşağıdaki modülleri yükleyin:

  • azure-mgmt-resource modülü, Active Directory için diğer Azure modüllerini vb. içerir.
  • azure-datalake-store modülü, Azure Data Lake Store dosya sistemi işlemlerini içerir.
  • azure-mgmt-datalake-store modülü, Azure Data Lake Store hesap yönetimi işlemlerini içerir.
  • azure-mgmt-datalake-analytics modülü, Azure Data Lake Analytics işlemlerini içerir.

İlk olarak, aşağıdaki komutu çalıştırarak en son pip sürüme sahip olduğunuzdan emin olun:

python -m pip install --upgrade pip

Bu belge kullanılarak pip version 9.0.1yazılmıştır.

Komut satırından modülleri yüklemek için aşağıdaki pip komutları kullanın:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Yeni python betiği oluşturma

Betiğin içine aşağıdaki kodu yapıştırın:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Modüllerin içeri aktarılabildiğini doğrulamak için bu betiği çalıştırın.

Kimlik Doğrulaması

Açılır pencere ile etkileşimli kullanıcı kimlik doğrulaması

Bu yöntem desteklenmez.

Cihaz koduyla etkileşimli kullanıcı kimlik doğrulaması

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

SPI ve gizli dizi ile etkileşimsiz kimlik doğrulaması

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

API ve sertifika ile etkileşimsiz kimlik doğrulaması

Bu yöntem desteklenmez.

Ortak betik değişkenleri

Bu değişkenler örneklerde kullanılır.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

İstemcileri oluşturma

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Azure Kaynak Grubu oluşturma

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Data Lake Analytics hesabı oluşturma

İlk olarak bir mağaza hesabı oluşturun.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Ardından bu mağazayı kullanan bir ADLA hesabı oluşturun.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

İş gönderme

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

bir işin bitmesini bekleme

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

İşlem hatlarını ve yinelenmeleri listeleme

İşlerinizde işlem hattı veya yineleme meta verilerinin eklenip eklenmediğine bağlı olarak, işlem hatlarını ve yinelemeleri listeleyebilirsiniz.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

İşlem ilkelerini yönetme

DataLakeAnalyticsAccountManagementClient nesnesi, bir Data Lake Analytics hesabı için işlem ilkelerini yönetmek için yöntemler sağlar.

İşlem ilkelerini listeleme

Aşağıdaki kod, bir Data Lake Analytics hesabı için işlem ilkelerinin listesini alır.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Yeni işlem ilkesi oluşturma

Aşağıdaki kod, bir Data Lake Analytics hesabı için yeni bir işlem ilkesi oluşturur ve belirtilen kullanıcının kullanabileceği en yüksek AU sayısını 50 ve en düşük iş önceliğini 250 olarak ayarlar.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Sonraki adımlar