تحميل البيانات تدريجياً من جداول متعددة في SQL Server إلى قاعدة بيانات Azure SQL

ينطبق على: Azure Data Factory Azure Synapse Analytics

تلميح

جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!

في هذا البرنامج التعليمي، يمكنك إنشاء مصنع بيانات Azure مع خط أنابيب الذي يحمل بيانات دلتا من جداول متعددة في قاعدة بيانات SQL Server إلى قاعدة بيانات SQL Azure.

نفذ الخطوات التالية في هذا البرنامج التعليمي:

  • إعداد مخازن بيانات المصدر والوجهة.
  • إنشاء data factory.
  • إنشاء وقت تشغيل تكامل الاستضافة الذاتية.
  • ثبت أداة وقت تشغيل التكامل.
  • أنشئ الخدمة ذات الصلة.
  • أنشئ مجموعات بيانات المورد والمصدر والعلامة المائية.
  • إنشاء خط أنابيب وتشغيله ومراقبة.
  • راجع النتائج.
  • إضافة بيانات أو تحديثها في جداول المصدر.
  • مراقبة تشغيل المسار.
  • راجع النتائج التالية.

نظرة عامة

فيما يلي الخطوات الهامة لإنشاء هذا الحل:

  1. حدد عمود العلامة المائية.

    حدد عمودا واحدا لكل جدول في مخزن البيانات المصدر، والذي يمكنك تحديد السجلات الجديدة أو المحدثة لكل تشغيل. عادةً ما تستمر البيانات الموجودة في هذا العمود المحدد في الزيادة (على سبيل المثال، last_modify_time أو معرف) عند إنشاء صفوف أو تحديثها. تستخدم أعلى قيمة في هذا العمود كعلامة مائية.

  2. أعد مخزن بيانات لتخزين قيمة العلامة المائية.

    في هذا البرنامج التعليمي، يمكنك تخزين قيمة العلامة المائية في قاعدة بيانات SQL.

  3. إنشاء خط أنابيب مع الأنشطة التالية:

    1. إنشاء نشاط ForEach الذي يبتكر من خلال قائمة أسماء الجداول المصدر التي يتم تمريرها كمعلمة إلى خط الأنابيب. لكل جدول مصدر استدعاء الأنشطة التالية لتنفيذ تحميل دلتا لهذا الجدول.

    2. إنشاء نشاطين من أنشطة البحث. استخدم نشاط البحث الأول لاسترداد قيمة العلامة المائية الأخيرة. استخدم نشاط البحث الثاني لاسترداد قيمة العلامة المائية الجديدة. يتم تمرير قيم هذه العلامات المائية إلى نشاط النسخ.

    3. أنشئ نشاط نسخ ينسخ صفوف من مخزن بيانات المصدر مع إدخال قيمة لعمود العلامة المائية أكبر من قيمة العلامة المائية القديمة وأقل من قيمة العلامة المائية الجديدة أو يساويها. بعد ذلك، ينسخ بيانات "دلتا" من مخزن بيانات المصدر إلى تخزين Azure Blob كملف جديد.

    4. أنشئ نشاط StoredProcedure الذي يحدث قيمة العلامة المائية للمسار الذي سيتم تشغيله في المرة القادمة.

    فيما يلي رسم تخطيطي لحل رفيع المستوى:

    تحميل البيانات على نحو تدريجي

في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.

المتطلبات الأساسية

  • SQL Server. يمكنك استخدام قاعدة بيانات SQL Server كمخزن البيانات المصدر في هذا البرنامج التعليمي.
  • Azure SQL Database. يمكنك استخدام قاعدة بيانات في قاعدة بيانات SQL Azure كمخزن بيانات مصدر. إذا لم تكن لديك قاعدة بيانات SQL، فراجع إنشاء قاعدة بيانات في Azure SQL Database لمعرفة خطوات إنشاء قاعدة بيانات أخرى.

إنشاء جداول المصدر في قاعدة بيانات SQL Server

  1. افتح SQL Server Management Studio (SSMS) أو Azure Data Studio، ثم اتصل بقاعدة بيانات SQL Server.

  2. في مستكشف الملقم (SSMS) أو في جزء الاتصالات (Azure Data Studio)، انقر بزر الماوس الأيمن فوق قاعدة البيانات واختر استعلام جديد.

  3. تشغيل الأمر SQL التالية مقابل قاعدة البيانات لإنشاء جداول customer_table المسماة project_table و:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

إنشاء جداول الوجهة في قاعدة بيانات SQL Azure

  1. افتح SQL Server Management Studio (SSMS) أو Azure Data Studio، ثم اتصل بقاعدة بيانات SQL Server.

  2. في مستكشف الملقم (SSMS) أو في جزء الاتصالات (Azure Data Studio)، انقر بزر الماوس الأيمن فوق قاعدة البيانات واختر استعلام جديد.

  3. تشغيل الأمر SQL التالية مقابل قاعدة البيانات لإنشاء جداول customer_table المسماة project_table و:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

أنشئ جدول أخر في قاعدة بيانات SQL لتخزين أعلى قيمة للعلامة المائية

  1. شغل أمر SQL التالي مقابل قاعدة بياناتك في SQL لإنشاء جدول باسم watermarktableلتخزين قيمة العلامة المائية:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. إدراج قيم العلامة المائية الأولية لكلا الجدولين المصدرين في جدول العلامة المائية.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

إنشاء إجراء مخزن في قاعدة بيانات SQL Azure

تشغيل الأمر التالي لإنشاء إجراء مخزن في قاعدة البيانات الخاصة بك. هذا الإجراء المخزن بتحديث قيمة العلامة المائية بعد تشغيل كل خط أنابيب.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

إنشاء أنواع البيانات والإجراءات المخزنة الإضافية في قاعدة بيانات azure SQL

تشغيل الاستعلام التالي لإنشاء اثنين من الإجراءات المخزنة ونوعي بيانات في قاعدة البيانات الخاصة بك. يتم استخدامها لدمج البيانات من جداول المصدر في جداول الوجهة.

من أجل جعل الرحلة سهلة لتبدأ، ونحن نستخدم مباشرة هذه الإجراءات المخزنة تمرير البيانات دلتا في عبر متغير الجدول ومن ثم دمج لهم في مخزن الوجهة. كن حذرا فإنه لا تتوقع عدد "كبير" من صفوف دلتا (أكثر من 100) ليتم تخزينها في متغير الجدول.

إذا كنت بحاجة إلى دمج عدد كبير من صفوف "دلتا" في المتجر الوجهة، فإننا نقترح عليك استخدام نشاط النسخ لنسخ جميع بيانات "دلتا" إلى جدول "مرحلي" مؤقت في المتجر الوجهة أولاً، ثم إنشاء الإجراء المخزن الخاص بك بدون استخدام متغير الجدول لدمجها من جدول "التدريج" إلى الجدول "النهائي".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

تثبيت أحدث الوحدات النمطية Azure PowerShell باتباع الإرشادات الموجودة في تثبيت وتكوين Azure PowerShell.

إنشاء مصدرًا للبيانات

  1. حدد متغيراً لاسم مجموعة الموارد الذي ستستخدمه لاحقاً في أوامر PowerShell. انسخ الأمر النصي التالي إلى PowerShell، حدد اسماً لمجموعة موارد Azure وأحطها بعلامات اقتباس مزدوجة، ثم شغل الأمر. مثال على ذلك "adfrg" .

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    إذا كانت مجموعة الموارد موجودة بالفعل، فقد لا ترغب في الكتابة فوقها. عين قيمة مختلفة $resourceGroupName للمتغير وشغل الأمر مرة أخرى.

  2. حدد متغير لموقع بيانات المصنع.

    $location = "East US"
    
  3. لإنشاء مجموعة موارد Azure، شغل الأمر التالي:

    New-AzResourceGroup $resourceGroupName $location
    

    إذا كانت مجموعة الموارد موجودة بالفعل، فقد لا ترغب في الكتابة فوقها. عين قيمة مختلفة $resourceGroupName للمتغير وشغل الأمر مرة أخرى.

  4. حدد متغير لموقع بيانات المصنع.

    هام

    حدث اسم مصنع البيانات باسم عمومي فريد. مثال على ذلك هو ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. لإنشاء بيانات المصنع شغل التالي Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

لاحظ النقاط التالية:

  • يجب أن يكون اسم مصنع البيانات مميزًا وعامًا. إذا استلمت الخطأ التالي، فغير الاسم وحاول مرة أخرى:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • لإنشاء مثيلات لبيانات المصنع، يجب أن يكون حساب المستخدم الذي تستخدمه لتسجيل الدخول إلى Azure مشتركاً في Azure أو له دول المالك أو مسؤول الاشتراك في Azure.

  • للحصول على قائمة بمناطق Azure التي يتوفر فيها حالياً Data Factory، حدد المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع "Analytics" لتحديد موقع Data Factory: "Products available by region". يمكن أن تكون مخازن البيانات (Storage، SQL Database، Azure SQL Managed Instance، وما إلى ذلك) وحسابات (Azure HDInsight وما إلى ذلك) التي يستخدمها data factory في مناطق أخرى.

إنشاء وقت تشغيل تكامل مستضاف ذاتيا

في هذا القسم، يمكنك إنشاء وقت تشغيل تكامل ذاتي الاستضافة وربطه بجهاز محلي مع قاعدة بيانات SQL Server. وقت تشغيل تكامل الاستضافة الذاتية هو المكون الذي ينسخ البيانات من SQL Server على جهازك إلى Azure SQL Database.

  1. إنشاء متغير لاسم وقت تشغيل التكامل. استخدم اسمًا فريدًا، ودوّن ملاحظة خاصة به. يمكن استخدامه في وقت لاحق في هذا البرنامج التعليمي.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. إنشاء وقت تشغيل تكامل الاستضافة الذاتية.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    فيما يلي ناتج العينة:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. لاسترداد حالة وقت تشغيل التكامل الذي تم إنشاؤه، شغّل الأمر التالي. تأكد من تعيين قيمة خاصية State إلى NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    فيما يلي ناتج العينة:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. لاسترداد مفاتيح المصادقة المُستخدمة في تسجيل وقت تشغيل تكامل الاستضافة الذاتية مع خدمة Azure Data Factory في مجموعة النظراء، شغّل الأمر التالي:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    فيما يلي ناتج العينة:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. انسخ أحد المفاتيح (باستثناء علامات الاقتباس المزدوجة) المُستخدمة لتسجيل وقت تشغيل تكامل الاستضافة الذاتية الذي قمت بتثبيته على جهازك في الخطوات التالية.

ثبت أداة وقت تشغيل التكامل

  1. إذا كان لديك بالفعل وقت تشغيل التكامل على جهازك، فقم بإلغاء تثبيته باستخدام إضافة برامج أو إزالتها.

  2. قم بتنزيل وقت تشغيل التكامل المستضاف ذاتيا على جهاز Windows محلي. شغّل التثبيت.

  3. في صفحة Welcome to Microsoft Integration Runtime Setup ، حدد Next.

  4. في صفحة اتفاقية ترخيص المستخدم النهائي، اقبل الشروط واتفاقية الترخيص، وحدد التالي.

  5. في صفحة مجلد الوجهة، حدد التالي.

  6. في صفحة Ready to install Microsoft Integration Runtime ، حدد Install.

  7. في صفحة Completed the Microsoft Integration Runtime Setup ، حدد Finish.

  8. في صفحة Register Integration Runtime (Self-hosted)، الصق المفتاح الذي حفظته في القسم السابق، وحدد Register.

    تسجيل وقت تشغيل التكامل

  9. في صفحة New Integration Runtime (Self-hosted) Node ، حدد Finish.

  10. ستظهر الرسالة التالية عند تسجيل وقت تشغيل تكامل الاستضافة الذاتية بنجاح:

    تم التسجيل بنجاح

  11. في صفحة Register Integration Runtime (Self-hosted)، حدد Launch Configuration Manager.

  12. عند توصيل العقدة بخدمة مجموعة النظراء، سترى الصفحة التالية:

    صفحة العُقدة متصلة

  13. الآن، اختبر الاتصال بقاعدة بيانات SQL Server.

    علامة تبويب Diagnostics

    أ. في صفحة Configuration Manager ، انتقل إلى علامة التبويب Diagnostics .

    ب. حدد SqlServer لنوع مصدر البيانات.

    جـ. أدخل اسم الخادم.

    د. أدخل اسم قاعدة البيانات.

    هـ. حدد وضع المصادقة.

    و. أدخل اسم المستخدم.

    ز. أدخل كلمة المرور المرتبطة باسم المستخدم.

    ح. حدد Test للتأكد من أن وقت تشغيل التكامل يمكنه الاتصال ب SQL Server. إذا لم ينجح الاتصال، فسترى علامة تحديد خضراء. إذا لم ينجح الاتصال، فسترى رسالة وجود خطأ. أصلح جميع المشكلات، وتأكد أن وقت تشغيل التكامل يتصل بـ SQL Server.

    إشعار

    دوّن القيم لنوع المصادقة والخادم وقاعدة البيانات والمستخدم وكلمة المرور. يمكنك استخدام ذلك لاحقًا في هذا البرنامج التعليمي.

إنشاء linked services

إنشاء خدمات مرتبطة في مصنع بيانات لربط مخازن بياناتك وحساب الخدمات إلى مصنع البيانات. في هذا القسم، يمكنك إنشاء خدمات مرتبطة بقاعدة بيانات SQL Server وقاعدة البيانات الخاصة بك في قاعدة بيانات azure SQL.

إنشاء الخدمة المرتبطة SQL Server

في هذه الخطوة، يمكنك ربط قاعدة بيانات SQL Server بمصنع البيانات.

  1. إنشاء ملف JSON المسمى SqlServerLinkedService.js في المجلد C:\ADFTutorials\IncCopyMultiTableTutorial (إنشاء المجلدات المحلية إذا لم تكن موجودة مسبقا) مع المحتوى التالي. حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.

    هام

    حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.

    إذا كنت تستخدم SQL المصادقة، فانسخ تعريف JSON التالي:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    إذا كنت تستخدم مصادقة Windows فانسخ تعريف JSON التالي:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    هام

    • حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.
    • استبدل < اسم وقت تشغيل التكامل > باسم وقت تشغيل التكامل.
    • استبدال <اسم الخادم> و<اسم قاعدة البيانات> و<اسم المستخدم> و<كلمة المرور> بقيم قاعدة بيانات SQL Server قبل حفظ الملف.
    • إذا كنت بحاجة إلى استخدام حرف مائل ( \ ) في حساب المستخدم أو اسم الخادم، فاستخدم حرف الهروب ( \ ). مثال على ذلك mydomain\\myuser .
  2. في PowerShell، قم بتشغيل cmdlet التالية للتبديل إلى المجلد C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. شغل cmdlet Set-AzDataFactoryV2LinkedService لإنشاء الخدمة المرتبطة: AzureStorageLinkedService. في المثال التالي، يمكنك تمرير قيم ResourceGroupName واسم معلمات DataFactory:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    فيما يلي ناتج العينة:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

إنشاء خدمة مرتبطة بقاعدة بيانات SQL

  1. إنشاء ملف JSON المسمى AzureSQLDatabaseLinkedService.js في C:\ADFTutorials\IncCopyMultiTableTutorial المجلد مع المحتوى التالي. (إنشاء المجلد ADF إذا لم يكن موجوداً بالفعل.) استبدل < اسم الخادم واسم قاعدة البيانات واسم المستخدم ><><>< وكلمة المرور باسم قاعدة بيانات SQL Server واسم > قاعدة البيانات واسم المستخدم وكلمة المرور قبل حفظ الملف.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. في PowerShell، شغل cmdlet Set-AzDataFactoryV2LinkedService لإنشاء الخدمة المرتبطة: AzureSqlDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    فيما يلي ناتج العينة:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

إنشاء datasets

في هذه الخطوة، يمكنك إنشاء مجموعات البيانات لتمثيل مصدر البيانات ووجهة البيانات والمكان لتخزين العلامة المائية.

قم بإنشاء مجموعة بيانات المصدر

  1. أنشئ ملف JSON باسم SourceDataset.json في نفس الملف بالمحتويات التالية:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    يستخدم نشاط النسخ في خط الأنابيب استعلام SQL لتحميل البيانات بدلاً من تحميل الجدول بأكمله.

  2. شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    إليك ناتج تشغيل عينة cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

أنشئ مجموعة بيانات المورد

  1. أنشئ ملف JSON باسم SinkDataset.json في نفس الملف بالمحتويات التالية. يتم تعيين عنصر tableName بواسطة خط أنابيب بشكل حيوي في وقت التشغيل. النشاط ForEach في خط الأنابيب تكرار خلال قائمة أسماء الجداول ويقوم بتمرير اسم الجدول إلى مجموعة البيانات هذه في كل تكرار.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    إليك ناتج تشغيل عينة cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

أنشئ مجموعة بيانات لهذه العلامة المائية

في هذه الخطوة، يمكنك إنشاء مجموعة بيانات لتخزين قيمة علامة مائية عالية.

  1. أنشئ ملف JSON باسم WatermarkDataset.json في نفس الملف بالمحتويات التالية:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    إليك ناتج تشغيل عينة cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

يأخذ خط أنابيب قائمة أسماء الجداول كمعلمة. النشاط ForEach يكرر من خلال قائمة أسماء الجداول وتنفيذ العمليات التالية:

  1. استخدم نشاط البحث لاسترداد قيمة العلامة المائية القديمة (القيمة الأولية أو التي تم استخدامها في التكرار الأخير).

  2. استخدم نشاط البحث لاسترداد قيمة العلامة المائية الجديدة (القيمة القصوى لعمود العلامة المائية في الجدول المصدر).

  3. استخدم نشاط النسخ لنسخ البيانات بين قيمتي العلامة المائية من قاعدة البيانات المصدر إلى قاعدة البيانات الوجهة.

  4. استخدم نشاط StoredProcedure لتحديث قيمة العلامة المائية القديمة لاستخدامها في الخطوة الأولى من التكرار التالي.

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

  1. أنشئ ملف JSON باسم ADFTutorialPipeline.json في مجلد C:\ADFGetStartedPSH بالمحتوى التالي:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. شغل Set-AzDataFactoryV2Pipeline cmdlet لإنشاء البنية الأساسية IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    فيما يلي ناتج العينة:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

قم بتشغيل البنية الأساسية

  1. إنشاء ملف معلمة المسمى Parameters.js في نفس المجلد مع المحتوى التالي:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. شغل المسار IncrementalCopyPipeline باستخدام Cmdlet Invoke-AzDataFactoryV2Pipeline. استبدل placeholders بمجموعة الموارد الخاصة بك واسم بيانات المصنع.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

مراقبة المسار

  1. قم بتسجيل الدخول إلى بوابة Azure.

  2. حدد جميع الخدمات، ابحث باستخدام الكلمة الرئيسية مصانع البيانات، وحدد مصانع البيانات.

  3. ابحث عن مصنع البيانات في قائمة مصانع البيانات، وحدده لفتح صفحة مصنع البيانات.

  4. في صفحة مصنع البيانات، حدد فتح على الإطار المتجانب فتح مصنع بيانات Azure لإطلاق Azure Data Factory في علامة تبويب منفصلة.

  5. في الصفحة الرئيسية لمصنع بيانات Azure، حدد مراقبة على الجانب الأيمن.

    تظهر لقطة الشاشة الصفحة الرئيسية لمصنع بيانات Azure.

  6. يمكنك أن ترى كل خطوط الأنابيب تعمل ووضعها. لاحظ أن في المثال التالي، يتم بنجاححالة تشغيل خط أنابيب . للتحقق من المعلمات التي تم تمريرها إلى خط الأنابيب، حدد الارتباط في العمود معلمات. إذا حدث خطأ، فسترى ارتباطاً في العمود خطأ.

    تظهر لقطة الشاشة تشغيل خط أنابيب لمصنع بيانات بما في ذلك خط الأنابيب الخاص بك.

  7. عند تحديد الارتباط في العمود الإجراءات، سترى جميع الأنشطة التي يتم تشغيلها لخط الأنابيب.

  8. للعودة إلى طريقة عرض "تشغيل خطوط الأنابيب"، حدد جميع تشغيلات خطوط الأنابيب.

مراجعة النتائج

في SQL Server Management Studio تشغيل الاستعلامات التالية مقابل قاعدة بيانات SQL الهدف للتحقق من أن تم نسخ البيانات من جداول المصدر إلى جداول الوجهة:

استفسار

select * from customer_table

الناتج

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

استفسار

select * from project_table

الناتج

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

استفسار

select * from watermarktable

الناتج

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

لاحظ أنه تم تحديث قيم العلامة المائية لكلا الجدولين.

إضافة المزيد من البيانات إلى الجداول المصدر

تشغيل الاستعلام التالي مقابل قاعدة بيانات المصدر SQL Server لتحديث صف موجود في customer_table. إدراج صف جديد في project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

أعد تشغيل التدفق

  1. الآن، أعد تشغيل خط الأنابيب بتنفيذ أمر PowerShell التالي:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. مراقبة تشغيل "تدفقات" باتباع الإرشادات الموجودة في قسم مراقبة "تدفقات" . عندما تكون حالة خط الأنابيب قيد التقدم،سترى ارتباط إجراء آخر ضمن إجراءات لإلغاء تشغيل خط الأنابيب.

  3. حدد تحديث لتحديث القائمة حتى ينجح تشغيل خط الأنابيب.

  4. اختياريا، حدد الارتباط عرض تشغيل النشاط ضمن إجراءات لمشاهدة جميع عمليات تشغيل النشاط المقترنة بتشغيل خط الأنابيب هذا.

راجع النتائج التالية

في SQL Server Management Studio تشغيل الاستعلامات التالية مقابل قاعدة البيانات الهدف للتحقق من أن تم نسخ البيانات المحدثة/الجديدة من جداول المصدر إلى جداول الوجهة.

استفسار

select * from customer_table

الناتج

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

لاحظ القيم الجديدة من الاسم وLastModifytime لـPersonID رقم 3.

استفسار

select * from project_table

الناتج

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

لاحظ أنه تمت إضافة إدخال مشروع جديد إلى project_table.

استفسار

select * from watermarktable

الناتج

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

لاحظ أنه تم تحديث قيم العلامة المائية لكلا الجدولين.

نفّذت الخطوات التالية في هذا البرنامج التعليمي:

  • إعداد مخازن بيانات المصدر والوجهة.
  • إنشاء data factory.
  • إنشاء وقت تشغيل تكامل مستضاف ذاتياً (IR).
  • ثبت أداة وقت تشغيل التكامل.
  • أنشئ الخدمة ذات الصلة.
  • أنشئ مجموعات بيانات المورد والمصدر والعلامة المائية.
  • إنشاء خط أنابيب وتشغيله ومراقبة.
  • راجع النتائج.
  • إضافة بيانات أو تحديثها في جداول المصدر.
  • مراقبة تشغيل المسار.
  • راجع النتائج التالية.

انتقل إلى البرنامج التعليمي التالي لمعرفة المزيد حول تحويل البيانات باستخدام مجموعة Spark على Azure: