Mulai Cepat: Membuat pabrik data dan alur menggunakan Python

BERLAKU UNTUK:Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Dalam mulai cepat ini, Anda membuat pabrik data dengan menggunakan Python. Alur dalam pabrik data ini menyalin data dari satu folder ke folder lain dalam penyimpanan Azure Blob.

Azure Data Factory adalah layanan integrasi data berbasis cloud yang memungkinkan Anda membuat alur kerja berbasis data untuk mengatur dan mengotomatiskan pemindahan data dan transformasi data. Dengan menggunakan Azure Data Factory, Anda dapat membuat dan menjadwalkan alur kerja berbasis data, yang disebut alur.

Alur dapat menyerap data dari penyimpanan data yang berbeda. Alur memproses atau mengubah data dengan menggunakan layanan komputasi seperti Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, dan Azure Machine Learning. Alur menerbitkan data output ke penyimpanan data seperti aplikasi Azure Synapse Analytics untuk kecerdasan bisnis (BI).

Prasyarat

  • Akun Azure dengan langganan aktif. Buat akun gratis.

  • Python 3.6+.

  • Akun Azure Storage.

  • Penjelajah Azure Storage (opsional).

  • Aplikasi di MICROSOFT Entra ID. Buat aplikasi dengan mengikuti langkah-langkah di tautan ini, menggunakan Opsi Autentikasi 2 (rahasia aplikasi), dan tetapkan aplikasi ke peran Kontributor dengan mengikuti petunjuk di artikel yang sama. Catat nilai berikut seperti yang ditampilkan dalam artikel untuk digunakan di langkah selanjutnya: ID aplikasi (klien), nilai rahasia klien, dan ID tenant.

Membuat dan mengunggah file input

  1. Luncurkan Notepad. Salin teks berikut dan simpan sebagai file input.txt di disk Anda.

    John|Doe
    Jane|Doe
    
  2. Gunakan alat seperti Azure Storage Explorer untuk membuat kontainer adfv2tutorial, dan folder input dalam kontainer. Lalu, unggah file input.txt ke folder input.

Menginstal paket Python

  1. Buka terminal atau perintah dengan hak istimewa admin. 

  2. Pertama, instal paket Python untuk sumber daya pengelolaan Azure:

    pip install azure-mgmt-resource
    
  3. Untuk menginstal paket Python untuk Data Factory, jalankan perintah berikut:

    pip install azure-mgmt-datafactory
    

    Python SDK untuk Data Factory mendukung Python 2.7 dan 3.6+.

  4. Untuk menginstal paket Python untuk autentikasi Azure Identity, jalankan perintah berikut:

    pip install azure-identity
    

    Catatan

    Paket "azure-identity" mungkin memiliki konflik dengan "azure-cli" pada beberapa dependensi umum. Jika Anda memenuhi masalah autentikasi apa pun, hapus "azure-cli" dan dependensinya, atau gunakan mesin bersih tanpa menginstal paket "azure-cli" untuk membuatnya berfungsi. Untuk Sovereign cloud, Anda harus menggunakan konstanta khusus cloud yang sesuai. Harap lihat Menghubungkan ke semua wilayah menggunakan pustaka Azure untuk Python Multi-cloud | Microsoft Docs untuk mendapatkan petunjuk guna terhubung dengan Python di Sovereign cloud.

Membuat klien pabrik data

  1. Buat file bernama datafactory.py. Tambahkan pernyataan berikut untuk menambahkan referensi ke namespace.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Tambahkan fungsi berikut yang mencetak informasi.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Tambahkan kode berikut ke metode Utama yang membuat instans kelas DataFactoryManagementClient. Anda menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga menggunakan obyek ini untuk memantau detail eksekusi alur. Atur variabel subscription_id ke ID langganan Azure Anda. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Soverign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Membuat pabrik data

Tambahkan kode berikut ke metode Utama yang membuat pabrik data. Jika grup sumber daya Anda sudah ada, komentari pernyataan create_or_update pertama.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Membuat layanan tertaut

Tambahkan kode berikut ke metode Utama yang membuat layanan tertaut Azure Storage.

Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Dalam mulai cepat ini, Anda hanya perlu membuat satu layanan tertaut Azure Storage sebagai sumber salinan dan penyimpanan sink, bernama "AzureStorageLinkedService" dalam sampel. Ganti <storageaccountname> dan <storageaccountkey> dengan nama dan kunci akun Azure Storage Anda.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Membuat himpunan data

Di bagian ini, Anda membuat dua himpunan data: satu untuk sumber dan yang lain untuk sink.

Membuat himpunan data untuk Azure Blob sumber

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Membuat himpunan data untuk Azure Blob sink

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Buat alur

Tambahkan kode berikut ke metode Utama yang membuat alur dengan aktivitas salin.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Membuat eksekusi alur

Tambahkan kode berikut ke metode Utama yang memicu eksekusi alur.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Memantau eksekusi alur

Untuk memantau eksekusi alur, tambahkan kode berikut ke metode Utama:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Sekarang, tambahkan pernyataan berikut untuk memanggil metode Utama saat program dijalankan:

# Start the main method
main()

Skrip lengkap

Berikut kode Python lengkapnya:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Menjalankan kode

Buat dan mulai aplikasi, lalu verifikasi eksekusi alur.

Konsol mencetak kemajuan pembuatan pabrik data, layanan tertaut, himpunan data, alur, dan eksekusi alur. Tunggu hingga Anda melihat detail eksekusi aktivitas penyalinan dengan data ukuran yang dibaca/ditulis. Kemudian, gunakan alat seperti Penjelajah Microsoft Azure Storage untuk memeriksa blob disalin ke "outputBlobPath" dari "inputBlobPath" seperti yang Anda tentukan dalam variabel.

Berikut adalah output sampel:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Membersihkan sumber daya

Untuk menghapus pabrik data, tambahkan kode berikut ke program:

adf_client.factories.delete(rg_name, df_name)

Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Ikuti tutorial untuk mempelajari tentang penggunaan Data Factory dalam skenario lainnya.