تحميل البيانات تدريجياً من جداول متعددة في SQL Server إلى قاعدة بيانات Azure SQL
ينطبق على: Azure Data Factory Azure Synapse Analytics
تلميح
جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!
في هذا البرنامج التعليمي، يمكنك إنشاء مصنع بيانات Azure مع خط أنابيب الذي يحمل بيانات دلتا من جداول متعددة في قاعدة بيانات SQL Server إلى قاعدة بيانات SQL Azure.
نفذ الخطوات التالية في هذا البرنامج التعليمي:
- إعداد مخازن بيانات المصدر والوجهة.
- إنشاء data factory.
- إنشاء وقت تشغيل تكامل الاستضافة الذاتية.
- ثبت أداة وقت تشغيل التكامل.
- أنشئ الخدمة ذات الصلة.
- أنشئ مجموعات بيانات المورد والمصدر والعلامة المائية.
- إنشاء خط أنابيب وتشغيله ومراقبة.
- راجع النتائج.
- إضافة بيانات أو تحديثها في جداول المصدر.
- مراقبة تشغيل المسار.
- راجع النتائج التالية.
نظرة عامة
فيما يلي الخطوات الهامة لإنشاء هذا الحل:
حدد عمود العلامة المائية.
حدد عمودا واحدا لكل جدول في مخزن البيانات المصدر، والذي يمكنك تحديد السجلات الجديدة أو المحدثة لكل تشغيل. عادةً ما تستمر البيانات الموجودة في هذا العمود المحدد في الزيادة (على سبيل المثال، last_modify_time أو معرف) عند إنشاء صفوف أو تحديثها. تستخدم أعلى قيمة في هذا العمود كعلامة مائية.
أعد مخزن بيانات لتخزين قيمة العلامة المائية.
في هذا البرنامج التعليمي، يمكنك تخزين قيمة العلامة المائية في قاعدة بيانات SQL.
إنشاء خط أنابيب مع الأنشطة التالية:
إنشاء نشاط ForEach الذي يبتكر من خلال قائمة أسماء الجداول المصدر التي يتم تمريرها كمعلمة إلى خط الأنابيب. لكل جدول مصدر استدعاء الأنشطة التالية لتنفيذ تحميل دلتا لهذا الجدول.
إنشاء نشاطين من أنشطة البحث. استخدم نشاط البحث الأول لاسترداد قيمة العلامة المائية الأخيرة. استخدم نشاط البحث الثاني لاسترداد قيمة العلامة المائية الجديدة. يتم تمرير قيم هذه العلامات المائية إلى نشاط النسخ.
أنشئ نشاط نسخ ينسخ صفوف من مخزن بيانات المصدر مع إدخال قيمة لعمود العلامة المائية أكبر من قيمة العلامة المائية القديمة وأقل من قيمة العلامة المائية الجديدة أو يساويها. بعد ذلك، ينسخ بيانات "دلتا" من مخزن بيانات المصدر إلى تخزين Azure Blob كملف جديد.
أنشئ نشاط StoredProcedure الذي يحدث قيمة العلامة المائية للمسار الذي سيتم تشغيله في المرة القادمة.
فيما يلي رسم تخطيطي لحل رفيع المستوى:
في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
المتطلبات الأساسية
- SQL Server. يمكنك استخدام قاعدة بيانات SQL Server كمخزن البيانات المصدر في هذا البرنامج التعليمي.
- Azure SQL Database. يمكنك استخدام قاعدة بيانات في قاعدة بيانات SQL Azure كمخزن بيانات مصدر. إذا لم تكن لديك قاعدة بيانات SQL، فراجع إنشاء قاعدة بيانات في Azure SQL Database لمعرفة خطوات إنشاء قاعدة بيانات أخرى.
إنشاء جداول المصدر في قاعدة بيانات SQL Server
افتح SQL Server Management Studio (SSMS) أو Azure Data Studio، ثم اتصل بقاعدة بيانات SQL Server.
في مستكشف الملقم (SSMS) أو في جزء الاتصالات (Azure Data Studio)، انقر بزر الماوس الأيمن فوق قاعدة البيانات واختر استعلام جديد.
تشغيل الأمر SQL التالية مقابل قاعدة البيانات لإنشاء جداول
customer_table
المسماةproject_table
و:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
إنشاء جداول الوجهة في قاعدة بيانات SQL Azure
افتح SQL Server Management Studio (SSMS) أو Azure Data Studio، ثم اتصل بقاعدة بيانات SQL Server.
في مستكشف الملقم (SSMS) أو في جزء الاتصالات (Azure Data Studio)، انقر بزر الماوس الأيمن فوق قاعدة البيانات واختر استعلام جديد.
تشغيل الأمر SQL التالية مقابل قاعدة البيانات لإنشاء جداول
customer_table
المسماةproject_table
و:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
أنشئ جدول أخر في قاعدة بيانات SQL لتخزين أعلى قيمة للعلامة المائية
شغل أمر SQL التالي مقابل قاعدة بياناتك في SQL لإنشاء جدول باسم
watermarktable
لتخزين قيمة العلامة المائية:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
إدراج قيم العلامة المائية الأولية لكلا الجدولين المصدرين في جدول العلامة المائية.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
إنشاء إجراء مخزن في قاعدة بيانات SQL Azure
تشغيل الأمر التالي لإنشاء إجراء مخزن في قاعدة البيانات الخاصة بك. هذا الإجراء المخزن بتحديث قيمة العلامة المائية بعد تشغيل كل خط أنابيب.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
إنشاء أنواع البيانات والإجراءات المخزنة الإضافية في قاعدة بيانات azure SQL
تشغيل الاستعلام التالي لإنشاء اثنين من الإجراءات المخزنة ونوعي بيانات في قاعدة البيانات الخاصة بك. يتم استخدامها لدمج البيانات من جداول المصدر في جداول الوجهة.
من أجل جعل الرحلة سهلة لتبدأ، ونحن نستخدم مباشرة هذه الإجراءات المخزنة تمرير البيانات دلتا في عبر متغير الجدول ومن ثم دمج لهم في مخزن الوجهة. كن حذرا فإنه لا تتوقع عدد "كبير" من صفوف دلتا (أكثر من 100) ليتم تخزينها في متغير الجدول.
إذا كنت بحاجة إلى دمج عدد كبير من صفوف "دلتا" في المتجر الوجهة، فإننا نقترح عليك استخدام نشاط النسخ لنسخ جميع بيانات "دلتا" إلى جدول "مرحلي" مؤقت في المتجر الوجهة أولاً، ثم إنشاء الإجراء المخزن الخاص بك بدون استخدام متغير الجدول لدمجها من جدول "التدريج" إلى الجدول "النهائي".
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
تثبيت أحدث الوحدات النمطية Azure PowerShell باتباع الإرشادات الموجودة في تثبيت وتكوين Azure PowerShell.
إنشاء مصدرًا للبيانات
حدد متغيراً لاسم مجموعة الموارد الذي ستستخدمه لاحقاً في أوامر PowerShell. انسخ الأمر النصي التالي إلى PowerShell، حدد اسماً لمجموعة موارد Azure وأحطها بعلامات اقتباس مزدوجة، ثم شغل الأمر. مثال على ذلك
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
إذا كانت مجموعة الموارد موجودة بالفعل، فقد لا ترغب في الكتابة فوقها. عين قيمة مختلفة
$resourceGroupName
للمتغير وشغل الأمر مرة أخرى.حدد متغير لموقع بيانات المصنع.
$location = "East US"
لإنشاء مجموعة موارد Azure، شغل الأمر التالي:
New-AzResourceGroup $resourceGroupName $location
إذا كانت مجموعة الموارد موجودة بالفعل، فقد لا ترغب في الكتابة فوقها. عين قيمة مختلفة
$resourceGroupName
للمتغير وشغل الأمر مرة أخرى.حدد متغير لموقع بيانات المصنع.
هام
حدث اسم مصنع البيانات باسم عمومي فريد. مثال على ذلك هو ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";
لإنشاء بيانات المصنع شغل التالي Set-AzDataFactoryV2 cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
لاحظ النقاط التالية:
يجب أن يكون اسم مصنع البيانات مميزًا وعامًا. إذا استلمت الخطأ التالي، فغير الاسم وحاول مرة أخرى:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
لإنشاء مثيلات لبيانات المصنع، يجب أن يكون حساب المستخدم الذي تستخدمه لتسجيل الدخول إلى Azure مشتركاً في Azure أو له دول المالك أو مسؤول الاشتراك في Azure.
للحصول على قائمة بمناطق Azure التي يتوفر فيها حالياً Data Factory، حدد المناطق التي تهمك في الصفحة التالية، ثم قم بتوسيع "Analytics" لتحديد موقع Data Factory: "Products available by region". يمكن أن تكون مخازن البيانات (Storage، SQL Database، Azure SQL Managed Instance، وما إلى ذلك) وحسابات (Azure HDInsight وما إلى ذلك) التي يستخدمها data factory في مناطق أخرى.
إنشاء وقت تشغيل تكامل مستضاف ذاتيا
في هذا القسم، يمكنك إنشاء وقت تشغيل تكامل ذاتي الاستضافة وربطه بجهاز محلي مع قاعدة بيانات SQL Server. وقت تشغيل تكامل الاستضافة الذاتية هو المكون الذي ينسخ البيانات من SQL Server على جهازك إلى Azure SQL Database.
إنشاء متغير لاسم وقت تشغيل التكامل. استخدم اسمًا فريدًا، ودوّن ملاحظة خاصة به. يمكن استخدامه في وقت لاحق في هذا البرنامج التعليمي.
$integrationRuntimeName = "ADFTutorialIR"
إنشاء وقت تشغيل تكامل الاستضافة الذاتية.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
فيما يلي ناتج العينة:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
لاسترداد حالة وقت تشغيل التكامل الذي تم إنشاؤه، شغّل الأمر التالي. تأكد من تعيين قيمة خاصية State إلى NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
فيما يلي ناتج العينة:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
لاسترداد مفاتيح المصادقة المُستخدمة في تسجيل وقت تشغيل تكامل الاستضافة الذاتية مع خدمة Azure Data Factory في مجموعة النظراء، شغّل الأمر التالي:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
فيما يلي ناتج العينة:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }
انسخ أحد المفاتيح (باستثناء علامات الاقتباس المزدوجة) المُستخدمة لتسجيل وقت تشغيل تكامل الاستضافة الذاتية الذي قمت بتثبيته على جهازك في الخطوات التالية.
ثبت أداة وقت تشغيل التكامل
إذا كان لديك بالفعل وقت تشغيل التكامل على جهازك، فقم بإلغاء تثبيته باستخدام إضافة برامج أو إزالتها.
قم بتنزيل وقت تشغيل التكامل المستضاف ذاتيا على جهاز Windows محلي. شغّل التثبيت.
في صفحة Welcome to Microsoft Integration Runtime Setup ، حدد Next.
في صفحة اتفاقية ترخيص المستخدم النهائي، اقبل الشروط واتفاقية الترخيص، وحدد التالي.
في صفحة مجلد الوجهة، حدد التالي.
في صفحة Ready to install Microsoft Integration Runtime ، حدد Install.
في صفحة Completed the Microsoft Integration Runtime Setup ، حدد Finish.
في صفحة Register Integration Runtime (Self-hosted)، الصق المفتاح الذي حفظته في القسم السابق، وحدد Register.
في صفحة New Integration Runtime (Self-hosted) Node ، حدد Finish.
ستظهر الرسالة التالية عند تسجيل وقت تشغيل تكامل الاستضافة الذاتية بنجاح:
في صفحة Register Integration Runtime (Self-hosted)، حدد Launch Configuration Manager.
عند توصيل العقدة بخدمة مجموعة النظراء، سترى الصفحة التالية:
الآن، اختبر الاتصال بقاعدة بيانات SQL Server.
أ. في صفحة Configuration Manager ، انتقل إلى علامة التبويب Diagnostics .
ب. حدد SqlServer لنوع مصدر البيانات.
جـ. أدخل اسم الخادم.
د. أدخل اسم قاعدة البيانات.
هـ. حدد وضع المصادقة.
و. أدخل اسم المستخدم.
ز. أدخل كلمة المرور المرتبطة باسم المستخدم.
ح. حدد Test للتأكد من أن وقت تشغيل التكامل يمكنه الاتصال ب SQL Server. إذا لم ينجح الاتصال، فسترى علامة تحديد خضراء. إذا لم ينجح الاتصال، فسترى رسالة وجود خطأ. أصلح جميع المشكلات، وتأكد أن وقت تشغيل التكامل يتصل بـ SQL Server.
إشعار
دوّن القيم لنوع المصادقة والخادم وقاعدة البيانات والمستخدم وكلمة المرور. يمكنك استخدام ذلك لاحقًا في هذا البرنامج التعليمي.
إنشاء linked services
إنشاء خدمات مرتبطة في مصنع بيانات لربط مخازن بياناتك وحساب الخدمات إلى مصنع البيانات. في هذا القسم، يمكنك إنشاء خدمات مرتبطة بقاعدة بيانات SQL Server وقاعدة البيانات الخاصة بك في قاعدة بيانات azure SQL.
إنشاء الخدمة المرتبطة SQL Server
في هذه الخطوة، يمكنك ربط قاعدة بيانات SQL Server بمصنع البيانات.
إنشاء ملف JSON المسمى SqlServerLinkedService.js في المجلد C:\ADFTutorials\IncCopyMultiTableTutorial (إنشاء المجلدات المحلية إذا لم تكن موجودة مسبقا) مع المحتوى التالي. حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.
هام
حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.
إذا كنت تستخدم SQL المصادقة، فانسخ تعريف JSON التالي:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
إذا كنت تستخدم مصادقة Windows فانسخ تعريف JSON التالي:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
هام
- حدد المقطع الأيمن استناداً إلى المصادقة التي تستخدمها للاتصال SQL Server.
- استبدل < اسم وقت تشغيل التكامل > باسم وقت تشغيل التكامل.
- استبدال <اسم الخادم> و<اسم قاعدة البيانات> و<اسم المستخدم> و<كلمة المرور> بقيم قاعدة بيانات SQL Server قبل حفظ الملف.
- إذا كنت بحاجة إلى استخدام حرف مائل (
\
) في حساب المستخدم أو اسم الخادم، فاستخدم حرف الهروب (\
). مثال على ذلكmydomain\\myuser
.
في PowerShell، قم بتشغيل cmdlet التالية للتبديل إلى المجلد C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
شغل cmdlet Set-AzDataFactoryV2LinkedService لإنشاء الخدمة المرتبطة: AzureStorageLinkedService. في المثال التالي، يمكنك تمرير قيم ResourceGroupName واسم معلمات DataFactory:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
فيما يلي ناتج العينة:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
إنشاء خدمة مرتبطة بقاعدة بيانات SQL
إنشاء ملف JSON المسمى AzureSQLDatabaseLinkedService.js في C:\ADFTutorials\IncCopyMultiTableTutorial المجلد مع المحتوى التالي. (إنشاء المجلد ADF إذا لم يكن موجوداً بالفعل.) استبدل < اسم الخادم واسم قاعدة البيانات واسم المستخدم ><><>< وكلمة المرور باسم قاعدة بيانات SQL Server واسم > قاعدة البيانات واسم المستخدم وكلمة المرور قبل حفظ الملف.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }
في PowerShell، شغل cmdlet Set-AzDataFactoryV2LinkedService لإنشاء الخدمة المرتبطة: AzureSqlDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
فيما يلي ناتج العينة:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
إنشاء datasets
في هذه الخطوة، يمكنك إنشاء مجموعات البيانات لتمثيل مصدر البيانات ووجهة البيانات والمكان لتخزين العلامة المائية.
قم بإنشاء مجموعة بيانات المصدر
أنشئ ملف JSON باسم SourceDataset.json في نفس الملف بالمحتويات التالية:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }
يستخدم نشاط النسخ في خط الأنابيب استعلام SQL لتحميل البيانات بدلاً من تحميل الجدول بأكمله.
شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
إليك ناتج تشغيل عينة cmdlet:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
أنشئ مجموعة بيانات المورد
أنشئ ملف JSON باسم SinkDataset.json في نفس الملف بالمحتويات التالية. يتم تعيين عنصر tableName بواسطة خط أنابيب بشكل حيوي في وقت التشغيل. النشاط ForEach في خط الأنابيب تكرار خلال قائمة أسماء الجداول ويقوم بتمرير اسم الجدول إلى مجموعة البيانات هذه في كل تكرار.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }
شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
إليك ناتج تشغيل عينة cmdlet:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
أنشئ مجموعة بيانات لهذه العلامة المائية
في هذه الخطوة، يمكنك إنشاء مجموعة بيانات لتخزين قيمة علامة مائية عالية.
أنشئ ملف JSON باسم WatermarkDataset.json في نفس الملف بالمحتويات التالية:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
شغل Set-AzDataFactoryV2Dataset cmdlet لإنشاء مجموعة بيانات WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
إليك ناتج تشغيل عينة cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية
يأخذ خط أنابيب قائمة أسماء الجداول كمعلمة. النشاط ForEach يكرر من خلال قائمة أسماء الجداول وتنفيذ العمليات التالية:
استخدم نشاط البحث لاسترداد قيمة العلامة المائية القديمة (القيمة الأولية أو التي تم استخدامها في التكرار الأخير).
استخدم نشاط البحث لاسترداد قيمة العلامة المائية الجديدة (القيمة القصوى لعمود العلامة المائية في الجدول المصدر).
استخدم نشاط النسخ لنسخ البيانات بين قيمتي العلامة المائية من قاعدة البيانات المصدر إلى قاعدة البيانات الوجهة.
استخدم نشاط StoredProcedure لتحديث قيمة العلامة المائية القديمة لاستخدامها في الخطوة الأولى من التكرار التالي.
إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية
أنشئ ملف JSON باسم ADFTutorialPipeline.json في مجلد C:\ADFGetStartedPSH بالمحتوى التالي:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }
شغل Set-AzDataFactoryV2Pipeline cmdlet لإنشاء البنية الأساسية IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
فيما يلي ناتج العينة:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
قم بتشغيل البنية الأساسية
إنشاء ملف معلمة المسمى Parameters.js في نفس المجلد مع المحتوى التالي:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }
شغل المسار IncrementalCopyPipeline باستخدام Cmdlet Invoke-AzDataFactoryV2Pipeline. استبدل placeholders بمجموعة الموارد الخاصة بك واسم بيانات المصنع.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
مراقبة المسار
قم بتسجيل الدخول إلى بوابة Azure.
حدد جميع الخدمات، ابحث باستخدام الكلمة الرئيسية مصانع البيانات، وحدد مصانع البيانات.
ابحث عن مصنع البيانات في قائمة مصانع البيانات، وحدده لفتح صفحة مصنع البيانات.
في صفحة مصنع البيانات، حدد فتح على الإطار المتجانب فتح مصنع بيانات Azure لإطلاق Azure Data Factory في علامة تبويب منفصلة.
في الصفحة الرئيسية لمصنع بيانات Azure، حدد مراقبة على الجانب الأيمن.
يمكنك أن ترى كل خطوط الأنابيب تعمل ووضعها. لاحظ أن في المثال التالي، يتم بنجاححالة تشغيل خط أنابيب . للتحقق من المعلمات التي تم تمريرها إلى خط الأنابيب، حدد الارتباط في العمود معلمات. إذا حدث خطأ، فسترى ارتباطاً في العمود خطأ.
عند تحديد الارتباط في العمود الإجراءات، سترى جميع الأنشطة التي يتم تشغيلها لخط الأنابيب.
للعودة إلى طريقة عرض "تشغيل خطوط الأنابيب"، حدد جميع تشغيلات خطوط الأنابيب.
مراجعة النتائج
في SQL Server Management Studio تشغيل الاستعلامات التالية مقابل قاعدة بيانات SQL الهدف للتحقق من أن تم نسخ البيانات من جداول المصدر إلى جداول الوجهة:
استفسار
select * from customer_table
الناتج
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
استفسار
select * from project_table
الناتج
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
استفسار
select * from watermarktable
الناتج
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
لاحظ أنه تم تحديث قيم العلامة المائية لكلا الجدولين.
إضافة المزيد من البيانات إلى الجداول المصدر
تشغيل الاستعلام التالي مقابل قاعدة بيانات المصدر SQL Server لتحديث صف موجود في customer_table. إدراج صف جديد في project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
أعد تشغيل التدفق
الآن، أعد تشغيل خط الأنابيب بتنفيذ أمر PowerShell التالي:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
مراقبة تشغيل "تدفقات" باتباع الإرشادات الموجودة في قسم مراقبة "تدفقات" . عندما تكون حالة خط الأنابيب قيد التقدم،سترى ارتباط إجراء آخر ضمن إجراءات لإلغاء تشغيل خط الأنابيب.
حدد تحديث لتحديث القائمة حتى ينجح تشغيل خط الأنابيب.
اختياريا، حدد الارتباط عرض تشغيل النشاط ضمن إجراءات لمشاهدة جميع عمليات تشغيل النشاط المقترنة بتشغيل خط الأنابيب هذا.
راجع النتائج التالية
في SQL Server Management Studio تشغيل الاستعلامات التالية مقابل قاعدة البيانات الهدف للتحقق من أن تم نسخ البيانات المحدثة/الجديدة من جداول المصدر إلى جداول الوجهة.
استفسار
select * from customer_table
الناتج
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
لاحظ القيم الجديدة من الاسم وLastModifytime لـPersonID رقم 3.
استفسار
select * from project_table
الناتج
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
لاحظ أنه تمت إضافة إدخال مشروع جديد إلى project_table.
استفسار
select * from watermarktable
الناتج
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
لاحظ أنه تم تحديث قيم العلامة المائية لكلا الجدولين.
المحتوى ذو الصلة
نفّذت الخطوات التالية في هذا البرنامج التعليمي:
- إعداد مخازن بيانات المصدر والوجهة.
- إنشاء data factory.
- إنشاء وقت تشغيل تكامل مستضاف ذاتياً (IR).
- ثبت أداة وقت تشغيل التكامل.
- أنشئ الخدمة ذات الصلة.
- أنشئ مجموعات بيانات المورد والمصدر والعلامة المائية.
- إنشاء خط أنابيب وتشغيله ومراقبة.
- راجع النتائج.
- إضافة بيانات أو تحديثها في جداول المصدر.
- مراقبة تشغيل المسار.
- راجع النتائج التالية.
انتقل إلى البرنامج التعليمي التالي لمعرفة المزيد حول تحويل البيانات باستخدام مجموعة Spark على Azure: