快速入門:使用 Python 建立資料處理站和管線

適用于: Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用于企業的單一分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版

在本快速入門中,您會使用 Python 建立資料處理站。 此資料處理站中的管線會將資料從一個資料夾複製到 Azure Blob 儲存體中的另一個資料夾。

Azure Data Factory 是雲端式資料整合服務,可讓您建立資料驅動工作流程,以協調及自動化資料移動和資料轉換。 使用 Azure Data Factory,您可以建立和排程稱為管線的資料驅動工作流程。

管線可以從不同的資料存放區擷取資料。 管線會使用 Azure HDInsight Hadoop、Spark、Azure Data Lake Analytics 和 Azure 機器學習等計算服務來處理或轉換資料。 管線會將輸出資料發佈至資料存放區,例如適用于商業智慧 (BI) 應用程式的 Azure Synapse Analytics。

必要條件

  • 具有有效訂用帳戶的 Azure 帳戶。 免費建立一個

  • Python 3.6+

  • Azure 儲存體帳戶

  • Azure 儲存體總管 (選擇性)。

  • Microsoft Entra ID 中的應用程式。 依照此連結中的步驟建立應用程式,方法是使用驗證選項 2(應用程式密碼),並遵循相同文章中的指示,將應用程式指派給 參與者 角色。 記下下列值,如後續步驟中要使用的文章所示: 應用程式(用戶端)識別碼、用戶端密碼值和租使用者識別碼。

建立和上傳輸入檔案

  1. 啟動記事本。 複製下列文字,並將其儲存為 磁片上的 input.txt 檔案。

    John|Doe
    Jane|Doe
    
  2. 使用 Azure 儲存體 Explorer 之類的 工具,在容器中建立 adfv2tutorial 容器和 輸入 資料夾。 然後,將 input.txt 檔案上傳至 輸入 資料夾。

安裝 Python 套件

  1. 使用系統管理員許可權開啟終端機或命令提示字元。 

  2. 首先,安裝適用于 Azure 管理資源的 Python 套件:

    pip install azure-mgmt-resource
    
  3. 若要安裝 Data Factory 的 Python 套件,請執行下列命令:

    pip install azure-mgmt-datafactory
    

    適用于 Data Factory Python SDK 支援 Python 2.7 和 3.6+。

  4. 若要安裝適用于 Azure 身分識別驗證的 Python 套件,請執行下列命令:

    pip install azure-identity
    

    注意

    在某些常見的相依性上,「azure-identity」 套件可能與 「azure-cli」 衝突。 如果您遇到任何驗證問題,請移除 「azure-cli」 及其相依性,或使用全新電腦而不安裝 「azure-cli」 套件,使其正常運作。 針對主權雲端,您必須使用適當的雲端特定常數。 請參閱 使用適用于 Python 多雲端的 Azure 程式庫連線至所有區域 |Microsoft Docs 提供在主權雲端中與 Python 連線的指示。

建立資料處理站用戶端

  1. 建立名為 datafactory.py 的 檔案。 新增下列語句以新增命名空間的參考。

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. 新增下列列印資訊的函式。

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. 將下列程式碼新增至 Main 方法,以建立 DataFactoryManagementClient 類別的實例。 您可以使用這個物件來建立資料處理站、連結服務、資料集和管線。 您也可以使用此物件來監視管線執行詳細資料。 將 subscription_id 變數設定 為 Azure 訂用帳戶的識別碼。 如需 Data Factory 目前可用的 Azure 區域清單,請選取您在下列頁面上感興趣的區域,然後展開 [分析 ] 以找出 Data Factory 依區域 提供的產品。 資料處理站所使用的資料存放區(Azure 儲存體、Azure SQL 資料庫等)和計算(HDInsight 等)可以位於其他區域。

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Soverign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

建立資料處理站

將下列程式碼新增至 建立資料處理站 的 Main 方法 。 如果您的資源群組已經存在,請批註化第一個 create_or_update 語句。

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

建立連結服務

將下列程式碼新增至 Main 方法,以建立 Azure 儲存體連結服務

您可以在資料處理站中建立連結服務,以將您的資料存放區和計算服務連結至資料處理站。 在本快速入門中,您只需要在範例中建立一個Azure 儲存體連結服務作為複製來源和接收存放區,名為 「Azure儲存體LinkedService」。 將 和 <storageaccountkey> 取代 <storageaccountname> 為您Azure 儲存體帳戶的名稱和金鑰。

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

建立資料集

在本節中,您會建立兩個資料集:一個用於來源,另一個用於接收。

建立來源 Azure Blob 的資料集

將下列程式碼新增至建立 Azure Blob 資料集的 Main 方法。 如需 Azure Blob 資料集屬性的相關資訊,請參閱 Azure Blob 連接器 一文。

您可以定義代表 Azure Blob 中來源資料的資料集。 此 Blob 資料集是指您在上一個步驟中建立Azure 儲存體連結服務。

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

建立接收 Azure Blob 的資料集

將下列程式碼新增至建立 Azure Blob 資料集的 Main 方法。 如需 Azure Blob 資料集屬性的相關資訊,請參閱 Azure Blob 連接器 一文。

您可以定義代表 Azure Blob 中來源資料的資料集。 此 Blob 資料集是指您在上一個步驟中建立Azure 儲存體連結服務。

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

建立新管線

將下列程式碼新增至 Main 方法,以建立 具有複製活動的 管線。

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

建立管線執行

將下列程式碼新增至 觸發管線執行的 Main 方法

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

監視管線執行

若要監視管線執行,請新增下列程式碼 Main 方法:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

現在,新增下列語句,以在執行程式時叫 用 main 方法:

# Start the main method
main()

完整指令碼

以下是完整的 Python 程式碼:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

執行程式碼

建置並啟動應用程式,然後確認管線執行。

主控台會列印建立資料處理站、連結服務、資料集、管線和管線執行的進度。 等到您看到複製活動執行詳細資料,以及讀取/寫入的資料大小。 然後,使用Azure 儲存體總 管之類的 工具,檢查 blob 從 「inputBlobPath」 複製到 「outputBlobPath」,如您在變數中指定的。

以下是範例輸出:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

清除資源

若要刪除資料處理站,請將下列程式碼新增至程式:

adf_client.factories.delete(rg_name, df_name)

此範例中的管線會將資料從一個位置複製到 Azure Blob 儲存體中的另一個位置。 請流覽教學 課程 ,以瞭解在更多案例中使用 Data Factory。