البدء السريع: إنشاء مصنع بيانات وخط أنابيب باستخدام Python

ينطبق على: Azure Data Factory Azure Synapse Analytics

تلميح

Data Factory في Microsoft Fabric هو الجيل القادم من Azure Data Factory، مع بنية أبسط، وذكاء اصطناعي مدمج، وميزات جديدة. إذا كنت جديدا في تكامل البيانات، ابدأ مع Fabric Data Factory. يمكن لأعباء عمل ADF الحالية الترقية إلى Fabric للوصول إلى قدرات جديدة في علوم البيانات، والتحليلات اللحظية، والتقارير.

في هذا البدء السريع، تنشئ مصنع بيانات باستخدام Python. يقوم خط الأنابيب في هذا المصنع بنسخ البيانات من مجلد إلى آخر في تخزين Azure Blob.

Azure Data Factory هي خدمة تكامل بيانات قائمة على السحابة تتيح لك إنشاء سير عمل قائم على البيانات لتنظيم وأتمتة حركة البيانات وتحويلها. باستخدام Azure Data Factory، يمكنك إنشاء وجدولة سير عمل قائم على البيانات، تسمى خطوط الأنابيب.

يمكن للتدفقات استيعاب البيانات من مخازن البيانات المتباينة. تقوم خطوط الأنابيب بمعالجة أو تحويل البيانات باستخدام خدمات الحوسبة مثل Azure HDInsight Hadoop وSpark وAzure Data Lake Analytics وAzure Machine Learning. تنشر خطوط الأنابيب بيانات الإخراج إلى مخازن البيانات مثل Azure Synapse Analytics for Business Intelligence (BI).

المتطلبات الأساسية

  • حساب Azure مع اشتراك نشط. أنشئ حسابًا مجانًا.

  • Python 3.6+.

  • حساب Azure Storage.

  • Azure Storage Explorer (اختياري).

  • طلب في Microsoft Entra ID. أنشئ التطبيق باتباع الخطوات الواردة في هذا الرابط، باستخدام خيار المصادقة 2 (سر التطبيق)، وقم بتعيين التطبيق إلى دور المساهم باتباع الإرشادات الواردة في نفس المقالة. دون القيم التالية كما هو موضح في المقالة لاستخدامها في الخطوات اللاحقة: معرف التطبيق (العميل) والقيمة السرية للعميل ومعرف المستأجر.

إنشاء ملف إدخال وتحميله

  1. قم بإطلاق Notepad. نسخ النص التالي وحفظه كملف input.txt على قرص التخزين.

    John|Doe
    Jane|Doe
    
  2. استخدم أدوات مثل Azure Storage Explorer لإنشاء حاوية adfv2tutorial، ومجلد input داخل الحاوية. ثم قم بتحميل ملف input.txt إلى مجلد الإدخال .

تثبيت حزمة Python

  1. افتح وحدة طرفية أو موجه الأوامر باستخدام امتيازات المسؤول. 

  2. أولا، قم بتثبيت حزمة Python لموارد إدارة Azure:

    pip install azure-mgmt-resource
    
  3. لتثبيت حزمة Python الخاصة ب Data Factory، قم بتشغيل الأمر التالي:

    pip install azure-mgmt-datafactory
    

    تدعم حزمة تطوير البرمجيات Python لData Factory Python 2.7 و3.6+.

  4. لتثبيت حزمة Python لمصادقة Azure Identity، قم بتشغيل الأمر التالي:

    pip install azure-identity
    

    إشعار

    قد تتعارض حزمة "azure-identity" مع "azure-cli" في بعض التبعيات الشائعة. إذا واجهت أي مشكلة تتعلق بالمصادقة، فعليك بإزالة "azure-cli" وتوابعها، أو استخدم جهازًا نظيفًا بدون تثبيت حزمة "azure-cli" لكي تعمل. بالنسبة للسحب ذات سيادة، يجب عليك استخدام الثوابت المناسبة الخاصة بالسحابة. يرجى الرجوع إلى Connect لجميع المناطق باستخدام مكتبات Azure ل Python Multi-cloud | Microsoft Docs تعليمات للاتصال ب Python في Sovereign Clouds.

كيفية إنشاء مصنع بيانات العميل

  1. إنشاء ملف باسم datafactory.py. أضف العبارات التالية لإضافة مراجع إلى مساحات الأسماء.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. أضف الدلالات التالية التي تطبع المعلومات.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مثيلا لفئة DataFactoryManagementClient. يمكنك استخدام هذا الكائن لإنشاء مصنع بيانات، وخدمة مرتبطة، ومجموعات بيانات وتدفق. يمكنك أيضًا استخدام هذا الكائن لمراقبة تفاصيل تشغيل البنية الأساسية. قم بتعيين متغير subscription_id على معرف اشتراكك Azure. للحصول على قائمة بالمناطق Azure التي يتوفر فيها Data Factory حاليا، اختر المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع Analytics لتحديد موقع Data Factory: المنتجات المتاحة حسب المنطقة. مخازن البيانات (Azure Storage، Azure SQL Database، إلخ) والحسابات (HDInsight، إلخ) المستخدمة في Data Factory يمكن أن تكون في مناطق أخرى.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

إنشاء مصدرًا للبيانات

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مصنع بيانات. إذا كانت مجموعة الموارد موجودة بالفعل، فيمكنك التعليق على أول create_or_update نص.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

إنشاء خدمة مرتبطة

أضف الكود التالي إلى طريقة Main التي تنشئ خدمة مرتبطة Azure Storage.

إنشاء خدمات مرتبطة في مصنع بيانات لربط مخازن بياناتك وحساب الخدمات إلى مصنع البيانات. في هذه البداية السريعة، تحتاج فقط إلى إنشاء خدمة Azure Storage مرتبطة واحدة كمصدر نسخ ومخزن مصرف، تسمى "AzureStorageLinkedService" في العينة. استبدل <storageaccountname> و <storageaccountkey> باسم ومفتاح حسابك Azure Storage.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

إنشاء datasets

في هذا القسم، يمكنك إنشاء مجموعتي بيانات: واحدة للمصدر، والأخرى لمواضع التلقي.

Create a dataset for source Azure Blob

أضف الكود التالي إلى الطريقة الرئيسية التي تنشئ مجموعة بيانات Azure blob. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة Azure blob connector.

أنت تعرف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات Blob هذه إلى الخدمة المرتبطة ب Azure Storage التي أنشأتها في الخطوة السابقة.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Create a dataset for sink Azure Blob

أضف الكود التالي إلى الطريقة الرئيسية التي تنشئ مجموعة بيانات Azure blob. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة Azure blob connector.

أنت تعرف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات Blob هذه إلى الخدمة المرتبطة ب Azure Storage التي أنشأتها في الخطوة السابقة.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ البنية الأساسية لبرنامج ربط العمليات التجارية مع نشاط نسخ.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

إنشاء تشغيل البنية الأساسية

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي يقوم بتشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

مراقبة تشغيل البنية الأساسية

لمراقبة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، أضف التعليمات البرمجية التالية الأسلوب الرئيسي :

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

الآن، أضف النص التالي لاستدعاء الأسلوب Main عند تشغيل البرنامج:

# Start the main method
main()

البرنامج النصي الكامل

إليك الكود الكامل لل Python:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

تشغيل التعليمات البرمجية

أنشئ وابدأ تشغيل التطبيق، ثم تحقق من تنفيذ المسار.

تقوم وحدة التحكم بطباعة تقدم إنشاء مصنع البيانات والخدمة المرتبطة ومجموعات البيانات والبنية الأساسية وتشغيلها. انتظر حتى ترى تفاصيل تشغيل نشاط النسخ مع حجم البيانات المقروءة / المكتوبة. ثم استخدم أدوات مثل Azure Storage explorer للتحقق من أن الكتلة (الblob) منسوخة إلى "outputBlobPath" من "inputBlobPath" كما حددت في المتغيرات.

فيما يلي ناتج العينة:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

تنظيف الموارد

لحذف مصنع البيانات، أضف التعليمات البرمجية التالية إلى البرنامج:

adf_client.factories.delete(rg_name, df_name)

ينسخ خط الأنابيب في هذه العينة البيانات من موقع إلى آخر في مخزن كتلة Azure. انتقل إلىtutorials للتعرف على استخدام Data Factory في المزيد من السيناريوهات.