إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على:
Azure Data Factory
Azure Synapse Analytics
تلميح
Data Factory في Microsoft Fabric هو الجيل القادم من Azure Data Factory، مع بنية أبسط، وذكاء اصطناعي مدمج، وميزات جديدة. إذا كنت جديدا في تكامل البيانات، ابدأ مع Fabric Data Factory. يمكن لأعباء عمل ADF الحالية الترقية إلى Fabric للوصول إلى قدرات جديدة في علوم البيانات، والتحليلات اللحظية، والتقارير.
في هذا البدء السريع، تنشئ مصنع بيانات باستخدام Python. يقوم خط الأنابيب في هذا المصنع بنسخ البيانات من مجلد إلى آخر في تخزين Azure Blob.
Azure Data Factory هي خدمة تكامل بيانات قائمة على السحابة تتيح لك إنشاء سير عمل قائم على البيانات لتنظيم وأتمتة حركة البيانات وتحويلها. باستخدام Azure Data Factory، يمكنك إنشاء وجدولة سير عمل قائم على البيانات، تسمى خطوط الأنابيب.
يمكن للتدفقات استيعاب البيانات من مخازن البيانات المتباينة. تقوم خطوط الأنابيب بمعالجة أو تحويل البيانات باستخدام خدمات الحوسبة مثل Azure HDInsight Hadoop وSpark وAzure Data Lake Analytics وAzure Machine Learning. تنشر خطوط الأنابيب بيانات الإخراج إلى مخازن البيانات مثل Azure Synapse Analytics for Business Intelligence (BI).
المتطلبات الأساسية
حساب Azure مع اشتراك نشط. أنشئ حسابًا مجانًا.
Azure Storage Explorer (اختياري).
طلب في Microsoft Entra ID. أنشئ التطبيق باتباع الخطوات الواردة في هذا الرابط، باستخدام خيار المصادقة 2 (سر التطبيق)، وقم بتعيين التطبيق إلى دور المساهم باتباع الإرشادات الواردة في نفس المقالة. دون القيم التالية كما هو موضح في المقالة لاستخدامها في الخطوات اللاحقة: معرف التطبيق (العميل) والقيمة السرية للعميل ومعرف المستأجر.
إنشاء ملف إدخال وتحميله
قم بإطلاق Notepad. نسخ النص التالي وحفظه كملف input.txt على قرص التخزين.
John|Doe Jane|Doeاستخدم أدوات مثل Azure Storage Explorer لإنشاء حاوية adfv2tutorial، ومجلد input داخل الحاوية. ثم قم بتحميل ملف input.txt إلى مجلد الإدخال .
تثبيت حزمة Python
افتح وحدة طرفية أو موجه الأوامر باستخدام امتيازات المسؤول.
أولا، قم بتثبيت حزمة Python لموارد إدارة Azure:
pip install azure-mgmt-resourceلتثبيت حزمة Python الخاصة ب Data Factory، قم بتشغيل الأمر التالي:
pip install azure-mgmt-datafactoryتدعم حزمة تطوير البرمجيات Python لData Factory Python 2.7 و3.6+.
لتثبيت حزمة Python لمصادقة Azure Identity، قم بتشغيل الأمر التالي:
pip install azure-identityإشعار
قد تتعارض حزمة "azure-identity" مع "azure-cli" في بعض التبعيات الشائعة. إذا واجهت أي مشكلة تتعلق بالمصادقة، فعليك بإزالة "azure-cli" وتوابعها، أو استخدم جهازًا نظيفًا بدون تثبيت حزمة "azure-cli" لكي تعمل. بالنسبة للسحب ذات سيادة، يجب عليك استخدام الثوابت المناسبة الخاصة بالسحابة. يرجى الرجوع إلى Connect لجميع المناطق باستخدام مكتبات Azure ل Python Multi-cloud | Microsoft Docs تعليمات للاتصال ب Python في Sovereign Clouds.
كيفية إنشاء مصنع بيانات العميل
إنشاء ملف باسم datafactory.py. أضف العبارات التالية لإضافة مراجع إلى مساحات الأسماء.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeأضف الدلالات التالية التي تطبع المعلومات.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مثيلا لفئة DataFactoryManagementClient. يمكنك استخدام هذا الكائن لإنشاء مصنع بيانات، وخدمة مرتبطة، ومجموعات بيانات وتدفق. يمكنك أيضًا استخدام هذا الكائن لمراقبة تفاصيل تشغيل البنية الأساسية. قم بتعيين متغير subscription_id على معرف اشتراكك Azure. للحصول على قائمة بالمناطق Azure التي يتوفر فيها Data Factory حاليا، اختر المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع Analytics لتحديد موقع Data Factory: المنتجات المتاحة حسب المنطقة. مخازن البيانات (Azure Storage، Azure SQL Database، إلخ) والحسابات (HDInsight، إلخ) المستخدمة في Data Factory يمكن أن تكون في مناطق أخرى.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
إنشاء مصدرًا للبيانات
أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مصنع بيانات. إذا كانت مجموعة الموارد موجودة بالفعل، فيمكنك التعليق على أول create_or_update نص.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
إنشاء خدمة مرتبطة
أضف الكود التالي إلى طريقة Main التي تنشئ خدمة مرتبطة Azure Storage.
إنشاء خدمات مرتبطة في مصنع بيانات لربط مخازن بياناتك وحساب الخدمات إلى مصنع البيانات. في هذه البداية السريعة، تحتاج فقط إلى إنشاء خدمة Azure Storage مرتبطة واحدة كمصدر نسخ ومخزن مصرف، تسمى "AzureStorageLinkedService" في العينة. استبدل <storageaccountname> و <storageaccountkey> باسم ومفتاح حسابك Azure Storage.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
إنشاء datasets
في هذا القسم، يمكنك إنشاء مجموعتي بيانات: واحدة للمصدر، والأخرى لمواضع التلقي.
Create a dataset for source Azure Blob
أضف الكود التالي إلى الطريقة الرئيسية التي تنشئ مجموعة بيانات Azure blob. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة Azure blob connector.
أنت تعرف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات Blob هذه إلى الخدمة المرتبطة ب Azure Storage التي أنشأتها في الخطوة السابقة.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Create a dataset for sink Azure Blob
أضف الكود التالي إلى الطريقة الرئيسية التي تنشئ مجموعة بيانات Azure blob. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة Azure blob connector.
أنت تعرف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات Blob هذه إلى الخدمة المرتبطة ب Azure Storage التي أنشأتها في الخطوة السابقة.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية
أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ البنية الأساسية لبرنامج ربط العمليات التجارية مع نشاط نسخ.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
إنشاء تشغيل البنية الأساسية
أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي يقوم بتشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
مراقبة تشغيل البنية الأساسية
لمراقبة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، أضف التعليمات البرمجية التالية الأسلوب الرئيسي :
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
الآن، أضف النص التالي لاستدعاء الأسلوب Main عند تشغيل البرنامج:
# Start the main method
main()
البرنامج النصي الكامل
إليك الكود الكامل لل Python:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
تشغيل التعليمات البرمجية
أنشئ وابدأ تشغيل التطبيق، ثم تحقق من تنفيذ المسار.
تقوم وحدة التحكم بطباعة تقدم إنشاء مصنع البيانات والخدمة المرتبطة ومجموعات البيانات والبنية الأساسية وتشغيلها. انتظر حتى ترى تفاصيل تشغيل نشاط النسخ مع حجم البيانات المقروءة / المكتوبة. ثم استخدم أدوات مثل Azure Storage explorer للتحقق من أن الكتلة (الblob) منسوخة إلى "outputBlobPath" من "inputBlobPath" كما حددت في المتغيرات.
فيما يلي ناتج العينة:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
تنظيف الموارد
لحذف مصنع البيانات، أضف التعليمات البرمجية التالية إلى البرنامج:
adf_client.factories.delete(rg_name, df_name)
المحتوى ذو الصلة
ينسخ خط الأنابيب في هذه العينة البيانات من موقع إلى آخر في مخزن كتلة Azure. انتقل إلىtutorials للتعرف على استخدام Data Factory في المزيد من السيناريوهات.