快速入門:使用 Python 建立資料處理站和管線
適用于: Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用于企業的單一分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版 !
在本快速入門中,您會使用 Python 建立資料處理站。 此資料處理站中的管線會將資料從一個資料夾複製到 Azure Blob 儲存體中的另一個資料夾。
Azure Data Factory 是雲端式資料整合服務,可讓您建立資料驅動工作流程,以協調及自動化資料移動和資料轉換。 使用 Azure Data Factory,您可以建立和排程稱為管線的資料驅動工作流程。
管線可以從不同的資料存放區擷取資料。 管線會使用 Azure HDInsight Hadoop、Spark、Azure Data Lake Analytics 和 Azure 機器學習等計算服務來處理或轉換資料。 管線會將輸出資料發佈至資料存放區,例如適用于商業智慧 (BI) 應用程式的 Azure Synapse Analytics。
必要條件
具有有效訂用帳戶的 Azure 帳戶。 免費建立一個。
Azure 儲存體總管 (選擇性)。
Microsoft Entra ID 中的應用程式。 依照此連結中的步驟建立應用程式,方法是使用驗證選項 2(應用程式密碼),並遵循相同文章中的指示,將應用程式指派給 參與者 角色。 記下下列值,如後續步驟中要使用的文章所示: 應用程式(用戶端)識別碼、用戶端密碼值和租使用者識別碼。
建立和上傳輸入檔案
啟動記事本。 複製下列文字,並將其儲存為 磁片上的 input.txt 檔案。
John|Doe Jane|Doe
使用 Azure 儲存體 Explorer 之類的 工具,在容器中建立 adfv2tutorial 容器和 輸入 資料夾。 然後,將 input.txt 檔案上傳至 輸入 資料夾。
安裝 Python 套件
使用系統管理員許可權開啟終端機或命令提示字元。
首先,安裝適用于 Azure 管理資源的 Python 套件:
pip install azure-mgmt-resource
若要安裝 Data Factory 的 Python 套件,請執行下列命令:
pip install azure-mgmt-datafactory
適用于 Data Factory 的 Python SDK 支援 Python 2.7 和 3.6+。
若要安裝適用于 Azure 身分識別驗證的 Python 套件,請執行下列命令:
pip install azure-identity
注意
在某些常見的相依性上,「azure-identity」 套件可能與 「azure-cli」 衝突。 如果您遇到任何驗證問題,請移除 「azure-cli」 及其相依性,或使用全新電腦而不安裝 「azure-cli」 套件,使其正常運作。 針對主權雲端,您必須使用適當的雲端特定常數。 請參閱 使用適用于 Python 多雲端的 Azure 程式庫連線至所有區域 |Microsoft Docs 提供在主權雲端中與 Python 連線的指示。
建立資料處理站用戶端
建立名為 datafactory.py 的 檔案。 新增下列語句以新增命名空間的參考。
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time
新增下列列印資訊的函式。
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))
將下列程式碼新增至 Main 方法,以建立 DataFactoryManagementClient 類別的實例。 您可以使用這個物件來建立資料處理站、連結服務、資料集和管線。 您也可以使用此物件來監視管線執行詳細資料。 將 subscription_id 變數設定 為 Azure 訂用帳戶的識別碼。 如需 Data Factory 目前可用的 Azure 區域清單,請選取您在下列頁面上感興趣的區域,然後展開 [分析 ] 以找出 Data Factory : 依區域 提供的產品。 資料處理站所使用的資料存放區(Azure 儲存體、Azure SQL 資料庫等)和計算(HDInsight 等)可以位於其他區域。
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Soverign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
建立資料處理站
將下列程式碼新增至 建立資料處理站 的 Main 方法 。 如果您的資源群組已經存在,請批註化第一個 create_or_update
語句。
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
建立連結服務
將下列程式碼新增至 Main 方法,以建立 Azure 儲存體連結服務 。
您可以在資料處理站中建立連結服務,以將您的資料存放區和計算服務連結至資料處理站。 在本快速入門中,您只需要在範例中建立一個Azure 儲存體連結服務作為複製來源和接收存放區,名為 「Azure儲存體LinkedService」。 將 和 <storageaccountkey>
取代 <storageaccountname>
為您Azure 儲存體帳戶的名稱和金鑰。
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
建立資料集
在本節中,您會建立兩個資料集:一個用於來源,另一個用於接收。
建立來源 Azure Blob 的資料集
將下列程式碼新增至建立 Azure Blob 資料集的 Main 方法。 如需 Azure Blob 資料集屬性的相關資訊,請參閱 Azure Blob 連接器 一文。
您可以定義代表 Azure Blob 中來源資料的資料集。 此 Blob 資料集是指您在上一個步驟中建立Azure 儲存體連結服務。
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
建立接收 Azure Blob 的資料集
將下列程式碼新增至建立 Azure Blob 資料集的 Main 方法。 如需 Azure Blob 資料集屬性的相關資訊,請參閱 Azure Blob 連接器 一文。
您可以定義代表 Azure Blob 中來源資料的資料集。 此 Blob 資料集是指您在上一個步驟中建立Azure 儲存體連結服務。
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
建立新管線
將下列程式碼新增至 Main 方法,以建立 具有複製活動的 管線。
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
建立管線執行
將下列程式碼新增至 觸發管線執行的 Main 方法 。
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
監視管線執行
若要監視管線執行,請新增下列程式碼 Main 方法:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
現在,新增下列語句,以在執行程式時叫 用 main 方法:
# Start the main method
main()
完整指令碼
以下是完整的 Python 程式碼:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
執行程式碼
建置並啟動應用程式,然後確認管線執行。
主控台會列印建立資料處理站、連結服務、資料集、管線和管線執行的進度。 等到您看到複製活動執行詳細資料,以及讀取/寫入的資料大小。 然後,使用Azure 儲存體總 管之類的 工具,檢查 blob 從 「inputBlobPath」 複製到 「outputBlobPath」,如您在變數中指定的。
以下是範例輸出:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
清除資源
若要刪除資料處理站,請將下列程式碼新增至程式:
adf_client.factories.delete(rg_name, df_name)
相關內容
此範例中的管線會將資料從一個位置複製到 Azure Blob 儲存體中的另一個位置。 請流覽教學 課程 ,以瞭解在更多案例中使用 Data Factory。