التشغيل السريع: إنشاء مصنع بيانات وبنية أساسية باستخدام Python

ينطبق على:Azure Data Factory Azure Synapse Analytics

تلميح

جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!

في هذه البداية السريعة، يمكنك إنشاء مصنع بيانات باستخدام Python. ينسخ التدفق الموجود في مصنع البيانات هذا البيانات من مجلد إلى مجلد آخر في تخزين Azure Blob.

Azure Data Factory عبارة عن خدمة تكامل بيانات مستندة إلى مجموعة النظراء تتيح لك إنشاء مهام سير عمل تعتمد على البيانات لتنظيم حركة البيانات وتحويلها وأتمتتها. باستخدام Azure Data Factory، يمكنك إنشاء وجدولة مهام سير عمل تعتمد على البيانات، تُسمى التدفقات.

يمكن للتدفقات استيعاب البيانات من مخازن البيانات المتباينة. معالجة التدفقات أو تحويلها باستخدام خدمات الحوسبة مثل Azure HDInsight Hadoop وSpark وAzure Data Lake Analytics وAzure Machine Learning. تنشر التدفقات الإخراج إلى مخازن البيانات مثل Azure Synapse Analytics لتطبيقات ذكاء الأعمال (BI).

المتطلبات الأساسية

  • حساب Azure مع اشتراك نشط. أنشئ حسابًا مجانًا.

  • Python 3.6+.

  • حساب Azure Storage.

  • Azure Storage Explorer (اختياري).

  • تطبيق في معرف Microsoft Entra. أنشئ التطبيق باتباع الخطوات الواردة في هذا الرابط، باستخدام خيار المصادقة 2 (سر التطبيق)، وقم بتعيين التطبيق إلى دور المساهم باتباع الإرشادات الواردة في نفس المقالة. دون القيم التالية كما هو موضح في المقالة لاستخدامها في الخطوات اللاحقة: معرف التطبيق (العميل) والقيمة السرية للعميل ومعرف المستأجر.

إنشاء ملف إدخال وتحميله

  1. قم بإطلاق Notepad. نسخ النص التالي وحفظه كملف input.txt على قرص التخزين.

    John|Doe
    Jane|Doe
    
  2. استخدم أدوات مثل Azure Storage Explorer لإنشاء حاوية adfv2tutorial ومجلد الإدخال في الحاوية. ثم قم بتحميل ملف input.txt إلى مجلد الإدخال .

تثبيت حِزمة Python

  1. افتح وحدة طرفية أو موجه الأوامر باستخدام امتيازات المسؤول. 

  2. أولاً، قم بتثبيت حزمة Python لموارد إدارة Azure:

    pip install azure-mgmt-resource
    
  3. لتثبيت حزمة Python لـ Data Factory، قم بتشغيل الأمر التالي:

    pip install azure-mgmt-datafactory
    

    يدعم Python SDK ل Data Factory Python 2.7 و3.6+.

  4. لتثبيت حزمة Python لمصادقة هوية Azure، قم بتشغيل الأمر التالي:

    pip install azure-identity
    

    إشعار

    قد تتعارض حزمة "azure-identity" مع "azure-cli" في بعض التبعيات الشائعة. إذا واجهت أي مشكلة تتعلق بالمصادقة، فعليك بإزالة "azure-cli" وتوابعها، أو استخدم جهازًا نظيفًا بدون تثبيت حزمة "azure-cli" لكي تعمل. بالنسبة للسحب ذات سيادة، يجب عليك استخدام الثوابت المناسبة الخاصة بالسحابة. يرجى الرجوع إلى الاتصال إلى جميع المناطق التي تستخدم مكتبات Azure ل Python Multi-cloud | Microsoft Docs للحصول على إرشادات للاتصال ب Python في السحب ذات السيادة.

كيفية إنشاء مصنع بيانات العميل

  1. إنشاء ملف باسم datafactory.py. أضف العبارات التالية لإضافة مراجع إلى مساحات الأسماء.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. أضف الدلالات التالية التي تطبع المعلومات.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مثيلا لفئة DataFactoryManagementClient. يمكنك استخدام هذا الكائن لإنشاء مصنع بيانات، وخدمة مرتبطة، ومجموعات بيانات وتدفق. يمكنك أيضًا استخدام هذا الكائن لمراقبة تفاصيل تشغيل البنية الأساسية. تعيين متغير subscription_id إلى معرف اشتراك Azure الخاص بك. للحصول على قائمة بمناطق Azure التي يتوفر فيها حالياً Data Factory، حدد المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع "Analytics" لتحديد موقع Data Factory: "Products available by region". تخزن البيانات (Azure Storage، وAzure SQL Database، وما إلى ذلك) وتحسب (HDInsight، وما إلى ذلك) التي يستخدمها مصنع البيانات في مناطق أخرى.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Soverign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

إنشاء مصدرًا للبيانات

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ مصنع بيانات. إذا كانت مجموعة الموارد موجودة بالفعل، فيمكنك التعليق على أول create_or_update نص.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

إنشاء خدمة مرتبطة

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ خدمة مرتبطة ب Azure Storage.

إنشاء خدمات مرتبطة في مصنع بيانات لربط مخازن بياناتك وحساب الخدمات إلى مصنع البيانات. في هذا التشغيل السريع، تحتاج فقط إلى إنشاء خدمة واحدة مرتبطة بالتخزين في Azure كمصدر للنسخ والتلقي، والذي يطلق عليه "AzureStorageLinkedService" في العينة. استبدل <storageaccountname> و <storageaccountkey> باسم ومفتاح حساب Azure Storage الخاص بك.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

إنشاء datasets

في هذا القسم، يمكنك إنشاء مجموعتي بيانات: واحدة للمصدر، والأخرى لمواضع التلقي.

إنشاء مجموعة بيانات لمصدر Azure Blob

إضافة التعليمات البرمجية التالية إلى الوسيلة "Main" التي تنشئ مجموعة بيانات كائن Azure. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة موصل Azure blob.

يمكنك تعريف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات الكائنات هذه إلى خدمة مخزن Azure المرتبطة التي تقوم بإنشائها في الخطوة السابقة

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

إنشاء مجموعة بيانات لنقطة Azure Blob

إضافة التعليمات البرمجية التالية إلى الوسيلة "Main" التي تنشئ مجموعة بيانات كائن Azure. للحصول على معلومات حول خصائص مجموعة بيانات Azure Blob، راجع مقالة موصل Azure blob.

يمكنك تعريف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات الكائنات هذه إلى خدمة مخزن Azure المرتبطة التي تقوم بإنشائها في الخطوة السابقة

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي ينشئ البنية الأساسية لبرنامج ربط العمليات التجارية مع نشاط نسخ.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

إنشاء تشغيل البنية الأساسية

أضف التعليمات البرمجية التالية إلى الأسلوب الرئيسي الذي يقوم بتشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

مراقبة تشغيل البنية الأساسية

لمراقبة تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، أضف التعليمات البرمجية التالية الأسلوب الرئيسي :

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

الآن، أضف النص التالي لاستدعاء الأسلوب Main عند تشغيل البرنامج:

# Start the main method
main()

البرنامج النصي الكامل

إليك التعليمات البرمجية الكاملة الخاصة بلغة Python:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

تشغيل التعليمات البرمجية

أنشئ وابدأ تشغيل التطبيق، ثم تحقق من تنفيذ المسار.

تقوم وحدة التحكم بطباعة تقدم إنشاء مصنع البيانات والخدمة المرتبطة ومجموعات البيانات والبنية الأساسية وتشغيلها. انتظر حتى ترى تفاصيل تشغيل نشاط النسخ مع حجم البيانات المقروءة / المكتوبة. ثم استخدم أدوات مثل Azure Storage Explorer للتحقق من نسخ كائنات ثنائية كبير الحجم إلى "outputBlobPath" من "inputBlobPath" كما حددته في المتغيرات.

فيما يلي ناتج العينة:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

تنظيف الموارد

لحذف مصنع البيانات، أضف التعليمات البرمجية التالية إلى البرنامج:

adf_client.factories.delete(rg_name, df_name)

تنسخ البنية الأساسية في هذه العينة البيانات من موقع إلى موقع آخر في تخزين Azure blob. انتقل إلىtutorials للتعرف على استخدام Data Factory في المزيد من السيناريوهات.