適用於:
Azure Data Factory
Azure Synapse Analytics
秘訣
Data Factory in Microsoft Fabric 是下一代的 Azure Data Factory,擁有更簡單的架構、內建 AI 及新功能。 如果你是資料整合新手,建議先從 Fabric Data Factory 開始。 現有的 ADF 工作負載可升級至 Fabric,以存取資料科學、即時分析與報告等新能力。
在這個快速入門中,你會使用 Python 建立一個資料工廠。 這個資料工廠中的管線會將資料從一個資料夾複製到另一個 Azure Blob 儲存資料夾。
Azure Data Factory 是一項基於雲端的資料整合服務,讓您能建立以資料驅動的工作流程,以協調與自動化資料移動與資料轉換。 使用 Azure Data Factory,您可以建立並排程資料驅動的工作流程,稱為管線。
管線可從不同資料存放區擷取資料。 管線透過使用計算服務如 Azure HDInsight Hadoop、Spark、Azure Data Lake Analytics 及 Azure Machine Learning 來處理或轉換資料。 管線會將輸出資料發布到資料儲存庫,例如用於商業智慧(BI)應用的 Azure Synapse Analytics。
必要條件
一個有有效訂閱的 Azure 帳號。 免費建立一個。
Azure 儲存體總管(可選)。
Microsoft Entra ID 中的應用。 遵循此連結中的步驟,使用驗證選項 2 (應用程式祕密) 來建立應用程式,並遵循相同文章中的指示將應用程式指派給「參與者」角色。 記下下列本文所示的值,以用於稍後的步驟:應用程式 (用戶端) 識別碼、用戶端祕密值和租用戶識別碼。
建立及上傳輸入檔案
啟動記事本。 複製下列文字,並在磁碟上儲存為 input.txt 檔案。
John|Doe Jane|Doe使用像 Azure 儲存體總管 這類工具,建立容器中的 adfv2tutorial 容器,以及容器中的 input 資料夾。 然後,將 input.txt 檔案上傳至 input 資料夾。
安裝 Python 套件
以系統管理員權限開啟終端機或命令提示字元。
首先,安裝 Azure 管理資源的 Python 套件:
pip install azure-mgmt-resource要安裝 Data Factory 的 Python 套件,請執行以下指令:
pip install azure-mgmt-datafactoryData Factory 的 Python SDK 支援 Python 2.7 和 3.6+。
要安裝 Python 套件以進行 Azure 身份驗證,請執行以下指令:
pip install azure-identity注意
"azure-identity" 套件可能會在某些常見的相依性上與 "azure-cli" 發生衝突。 如果您遇到任何驗證問題,請移除 "azure-cli" 及其相依性,或使用未安裝 "azure-cli" 套件的全新機器來使其正常執行。 針對主權雲端,您必須使用適當的雲端特定常數。 請參考 使用 Azure 函式庫在 Python 多雲環境中連接至所有區域 | Microsoft Docs 以獲取在主權雲中使用 Python 連線的指導說明。
建立資料處理站用戶端
建立名為 datafactory.py 的檔案。 新增下列陳述式以將參考新增至命名空間。
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time新增會列印資訊的下列函式。
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))將下列程式碼新增至 Main 方法,以建立 DataFactoryManagementClient 類別的執行個體。 您會使用此物件來建立資料處理站、連結服務、資料集和管線。 您也可以使用此物件來監視管線執行的詳細資訊。 將 subscription_id 變數設為你Azure訂閱的 ID。 如需查詢目前 Data Factory 可用的Azure區域清單,請在下一頁選擇您感興趣的區域,然後展開 Analytics 以找到 Data Factory:Products by region。 資料工廠使用的資料儲存(Azure 儲存體、Azure SQL Database 等)和運算(HDInsight 等)可能在其他區域。
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
建立資料處理站
將下列程式碼新增至 Main 方法,以建立資料處理站。 如果您的資源群組已經存在,請將第一個 create_or_update 陳述式註解掉。
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
建立連結的服務
在 Main 方法中加入以下程式碼,建立一個 Azure 儲存體 連結服務。
您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在這個快速入門中,你只需要建立一個 Azure 儲存體 連結服務,作為複製來源和匯入儲存庫,範例中名為「AzureStorageLinkedService」。 將 <storageaccountname> 和 <storageaccountkey> 替換成你Azure 儲存體帳號的名稱和金鑰。
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
建立資料集
在本節中,您會建立兩個資料集:一個作為來源資料,另一個作為匯入資料。
為來源 Azure Blob 儲存體建立資料集
在 Main 方法中加入以下程式碼,該方法建立 Azure blob 資料集。 欲了解Azure Blob 資料集的屬性,請參閱 Azure blob connector 條目。
你定義一個代表 Azure Blob 來源資料的資料集。 這個 Blob 資料集指的是你在前一步建立的 Azure 儲存體 連結服務。
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
為接收器 Azure Blob 建立資料集
在 Main 方法中加入以下程式碼,該方法建立 Azure blob 資料集。 欲了解Azure Blob 資料集的屬性,請參閱 Azure blob connector 條目。
你定義一個代表 Azure Blob 來源資料的資料集。 這個 Blob 資料集指的是你在前一步建立的 Azure 儲存體 連結服務。
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
建立新管線
將下列程式碼新增至 Main 方法,以建立具有複製活動的管線。
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
建立管線執行
將下列程式碼新增至 Main 方法,以觸發管線執行。
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
監視管線執行
若要監視管道執行,將下列程式碼新增至 Main 方法:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
現在,新增下列陳述式以在執行程式時叫用 main 方法:
# Start the main method
main()
完整指令碼
以下是完整的 Python 程式碼:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
執行程式碼
建置並啟動應用程式,然後確認管線執行。
主控台會印出建立資料處理站、連結服務、資料集、管線和管線執行的進度。 請等待直到您看到複製活動的執行詳細資料及數據讀取/寫入的大小。 接著,使用 Azure 儲存體總管等工具,檢查 Blob 是否依您在變數中指定的方式,從 inputBlobPath 複製到 outputBlobPath。
以下是範例輸出:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
清除資源
若要刪除資料處理站,請將下列程式碼新增至程式:
adf_client.factories.delete(rg_name, df_name)
相關內容
本範例中的管線會將資料從一個位置複製到 Azure Blob 儲存體中的另一個位置。 瀏覽教學課程以了解使用 Data Factory 的更多案例。