Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
ŞUNLARA UYGULANIR:
Azure Data Factory
Azure Synapse Analytics
İpucu
Microsoft Fabric'daki
Bu hızlı başlangıçta, Python kullanarak bir veri fabrikası oluşturacaksınız. Bu veri fabrikasındaki pipeline, Azure blob depolama hizmetinde bir klasördeki verileri başka bir klasöre kopyalar.
Azure Data Factory, veri taşımayı ve veri dönüştürmeyi düzenlemeye ve otomatikleştirmeye yönelik veri odaklı iş akışları oluşturmanıza olanak tanıyan bulut tabanlı bir veri tümleştirme hizmetidir. Azure Data Factory kullanarak işlem hatları olarak adlandırılan veri temelli iş akışları oluşturabilir ve zamanlayabilirsiniz.
İşlem hatları, farklı veri depolarından veri alabilir. İşlem hatları Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics ve Azure Machine Learning gibi işlem hizmetlerini kullanarak verileri işler veya dönüştürür. İşlem hatları, çıktı verilerini iş zekası (BI) uygulamaları için Azure Synapse Analytics gibi veri depolarına yayımlar.
Önkoşullar
Etkin aboneliği olan bir Azure hesabı. Ücretsiz bir tane oluşturun.
Azure Storage Explorer (isteğe bağlı).
Microsoft Entra ID'da bir uygulama. Bu bağlantıdaki adımları izleyerek, Kimlik Doğrulama Seçeneği 2'yi (uygulama gizli dizisi) kullanarak uygulamayı oluşturun ve aynı makaledeki yönergeleri izleyerek uygulamayı Katkıda Bulunan rolüne atayın. Sonraki adımlarda kullanmak üzere makalede gösterildiği gibi aşağıdaki değerleri not edin: Uygulama (istemci) kimliği, istemci gizli anahtarı değeri ve kiracı kimliği.
Giriş dosyası oluşturma ve yükleme
Not Defteri'ni başlatın. Aşağıdaki metni kopyalayıp diskinizde input.txt dosyası olarak kaydedin.
John|Doe Jane|Doekapsayıcıda adfv2tutorial kapsayıcısını ve input klasörünü oluşturmak için Azure Storage Explorer gibi araçları kullanın. Sonra, input.txt dosyasını input klasörüne yükleyin.
Python paketini yükleme
Yönetici ayrıcalıklarıyla bir terminal veya komut istemi açın.
İlk olarak, Azure yönetim kaynakları için Python paketini yükleyin:
pip install azure-mgmt-resourceData Factory için Python paketini yüklemek için aşağıdaki komutu çalıştırın:
pip install azure-mgmt-datafactoryData Factory için
Python SDK'sı Python 2.7 ve 3.6+ sürümünü destekler. Azure Kimlik kimlik doğrulaması için Python paketini yüklemek için aşağıdaki komutu çalıştırın:
pip install azure-identityNot
"azure-identity" paketinde bazı yaygın bağımlılıklarda "azure-cli" ile çakışmalar olabilir. Herhangi bir kimlik doğrulama sorununu karşılıyorsanız , "azure-cli" ve bağımlılıklarını kaldırın veya "azure-cli" paketini yüklemeden temiz bir makine kullanarak bu paketin çalışmasını sağlayın. Bağımsız bulutlar için uygun buluta özgü sabitleri kullanmanız gerekir. Python için Azure kitaplıkları kullanarak tüm bölgelere bağlanma | Souvereign clouds'da Python ile bağlantı kurma yönergeleri için Microsoft Docs'a bakın .
Veri fabrikası istemcisi oluşturma
datafactory.py adlı bir dosya oluşturun. Ad alanlarına başvurular eklemek için aşağıdaki ifadeleri ekleyin.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeBilgileri yazdıran aşağıdaki işlevleri ekleyin.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Aşağıdaki kodu DataFactoryManagementClient sınıfının bir örneğini oluşturan Main yöntemine ekleyin. Veri fabrikası, bağlı hizmet, veri kümeleri ve işlem hattı oluşturmak için bu nesneyi kullanırsınız. Bu nesneyi ayrıca işlem hattı ayrıntılarını izlemek için kullanabilirsiniz. subscription_id değişkenini Azure aboneliğinizin kimliğine ayarlayın. Data Factory'nin şu anda kullanılabilir olduğu Azure bölgelerin listesi için aşağıdaki sayfada ilginizi çekebilecek bölgeleri seçin ve ardından Analytics'yi genişleterek Data Factory: Products by region öğesini bulun. Veri fabrikası tarafından kullanılan veri depoları (Azure Storage, Azure SQL Database vb.) ve işlem (HDInsight vb.) diğer bölgelerde olabilir.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Veri fabrikası oluşturma
Aşağıdaki kodu veri fabrikası oluşturan Main yöntemine ekleyin. Kaynak grubunuz zaten varsa, birinci create_or_update deyimine açıklama ekleyin.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Bağlı hizmet oluşturma
Azure Storage bağlı hizmeti oluşturan Main yöntemine aşağıdaki kodu ekleyin.
Veri depolarınızı ve işlem hizmetlerinizi veri fabrikasına bağlamak için veri fabrikasında bağlı hizmetler oluşturursunuz. Bu hızlı başlangıçta, örnekte "AzureStorageLinkedService" adlı, hem kopyalama kaynağı hem de hedef depolama olarak kullanabileceğiniz yalnızca bir Azure Storage bağlı hizmeti oluşturmanız gerekir.
<storageaccountname> ve <storageaccountkey> Azure Storage hesabınızın adı ve anahtarıyla değiştirin.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Veri kümeleri oluşturma
Bu bölümde biri kaynak, diğeri havuz için olmak üzere iki veri kümesi oluşturacaksınız.
Kaynak Azure Blobu için veri kümesi oluşturma
Azure blob veri kümesi oluşturan Main yöntemine aşağıdaki kodu ekleyin. Azure Blob veri kümesinin özellikleri hakkında bilgi için Azure blob bağlayıcısı makalesine bakın.
Azure Blob'daki kaynak verileri temsil eden bir veri kümesi tanımlarsınız. Bu Blob veri kümesi, önceki adımda oluşturduğunuz Azure Storage bağlı hizmeti ifade eder.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Havuz Azure Blobu için veri kümesi oluşturma
Azure blob veri kümesi oluşturan Main yöntemine aşağıdaki kodu ekleyin. Azure Blob veri kümesinin özellikleri hakkında bilgi için Azure blob bağlayıcısı makalesine bakın.
Azure Blob'daki kaynak verileri temsil eden bir veri kümesi tanımlarsınız. Bu Blob veri kümesi, önceki adımda oluşturduğunuz Azure Storage bağlı hizmeti ifade eder.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Bir işlem hattı oluştur
Main yöntemine bir kopyalama etkinliği içeren bir işlem hattı oluşturan aşağıdaki kodu ekleyin.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
İşlem hattı çalıştırmasını oluştur
Aşağıdaki kodu bir işlem hattı çalıştırması tetikleyenMain yöntemine ekleyin.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Boru hattı çalışmasını izleme
İşlem hattını izlemek için, aşağıdaki kodu Main yöntemine ekleyin:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Şimdi, program çalıştırıldığında main yöntemini çağırmak için aşağıdaki deyimi ekleyin:
# Start the main method
main()
Tam metin
Tam Python kodu aşağıdadır:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Kodu çalıştırma
Uygulamayı derleyip başlatın, ardından işlem hattı çalıştırılmasını doğrulayın.
Konsol; veri fabrikası, ilişkili hizmet, veri kümeleri, işlem hattı ve işlem hattı çalıştırmasının ilerleme durumunu yazdırır. Okunan/yazılan veri boyutunu içeren kopyalama etkinliği ayrıntılarını görene kadar bekleyin. Ardından, Azure Storage explorer gibi araçları kullanarak blobların değişkenlerde belirttiğiniz gibi "inputBlobPath" öğesinden "outputBlobPath"e kopyalandığından denetleyin.
Örnek çıktı aşağıdaki gibidir:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Kaynakları temizleme
Veri fabrikasını silmek için aşağıdaki kodu programa ekleyin:
adf_client.factories.delete(rg_name, df_name)
İlgili içerik
Bu örnekteki işlem hattı verileri bir konumdan Azure blob depolama alanındaki başka bir konuma kopyalar. Daha fazla senaryoda Data Factory’yi kullanma hakkında bilgi almak için öğreticileri okuyun.