البرنامج النصي لتدفق البيانات (DFS)

ينطبق على:Azure Data Factory Azure Synapse Analytics

تلميح

جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!

تتوفر تدفقات البيانات في كل من Azure Data Factory وخطوط أنابيب Azure Synapse. تنطبق هذه المقالة على تعيين تدفقات البيانات. إذا كنت جديداً في مجال التحويلات، فيرجى الرجوع إلى المقالة التمهيدية تحويل البيانات باستخدام تدفق بيانات التعيين.

البرنامج النصي لتدفق البيانات (DFS) هو بيانات التعريف الأساسية، وهو يشبه لغة الترميز، التي يتم استخدامها لتنفيذ التحويلات المضمنة في تدفق بيانات التعيين. يتم تمثيل كل تحويل بواسطة سلسلة من الخصائص التي توفر المعلومات الضرورية لتشغيل المهمة بشكل صحيح. البرنامج النصي مرئي وقابل للتحرير من ملف تعريف التطبيق (ADF) من خلال النقر على زر «script» على الشريط العلوي من واجهة مستخدم المستعرض.

Script button

على سبيل المثال، يتولى allowSchemaDrift: true, في تحويل المصدر إبلاغ الخدمة لتضمين جميع الأعمدة من مجموعة البيانات المصدر في تدفق البيانات حتى إذا لم تكن مضمنة في إسقاط المخطط.

حالات الاستخدام

يتم إنتاج DFS تلقائيًّا بواسطة واجهة المستخدم. يمكنك النقر فوق الزر "برنامج نصي" لعرض البرنامج النصي وتخصيصه. يمكنك أيضاً إنشاء البرامج النصية خارج واجهة مستخدم ADF ثم تمرير ذلك إلى PowerShell cmdlet. عند تصحيح أخطاء تدفقات البيانات المعقدة، قد تجد أنه من الأسهل مسح التعليمات البرمجية في الخلف للبرنامج النصي بدلاً من مسح التمثيل البياني عبر واجهة المستخدم للتدفقات لديك.

فيما يلي بعض أمثلة حالات الاستخدام:

  • الإنتاج البرمجي لكثير من تدفقات البيانات التي تكون متشابهة إلى حد ما، أي "القضاء على" تدفقات البيانات.
  • التعبيرات المعقدة التي تصعب إدارتها في واجهة المستخدم أو التي تنتج عنها مشكلات في التحقق من الصحة.
  • تصحيح الأخطاء المتعددة التي نتجت أثناء التنفيذ وفهمها بشكل أفضل.

عند إنشاء برنامج نصي لتدفق بيانات لاستخدامه مع PowerShell أو على واجهة برمجة التطبيقات، يجب طي النص المنسق إلى سطر واحد. يمكنك الاحتفاظ علامات التبويب وخطوط جديدة كأحرف إلغاء. ولكن يجب تنسيق النص ليتم استيعابه ضمن خاصية JSON. يوجد زر على واجهة مستخدم محرر البرنامج النصي في الجزء السفلي، مسؤول عن تنسيق البرنامج النصي كسطر واحد لأجلك.

Copy button

طريقة إضافة التحويلات

تتطلب إضافة التحويلات ثلاث خطوات أساسية: إضافة بيانات التحويل الأساسية، وإعادة توجيه تدفق المدخلات، ثم إعادة توجيه تدفق الإنتاج. ويمكن رؤية ذلك بسهولة أكبر في مثال. لنفترض أننا نبدأ بمصدر بسيط لتلقي تدفق البيانات كما يلي:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

إذا قررنا إضافة تحويل مشتق، فأولاً سنكون بحاجة إلى إنشاء نص التحويل الأساسي الذي يحتوي على تعبير بسيط لإضافة عمود جديد بأحرف كبيرة يسمى upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

ونأخذ بعد ذلك DFS الموجود ونضيف التحويل:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

والآن نقوم بإعادة توجيه دفق البيانات الوارد من خلال تحديد التحويل الذي نريد أن يأتي التحويل الجديد بعده (إنه source1 في هذه الحالة) ونسخ اسم التدفق إلى التحويل الجديد:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

وأخيرًا، نحدد التحويل الذي نريد أن يأتي بعد هذا التحويل الجديد، ونستبدل تدفق المدخلات الخاص به (وهو sink1 في هذه الحالة) باسم تدفق الإنتاج الخاص بتحويلنا الجديد:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

أساسيات DFS

يتكون DFS من سلسلة من التحويلات المتصلة، بما في ذلك المصادر، والمتلقين، وغيرها من التحويلات المختلفة التي يمكن أن تضيف أعمدة جديدة، وأن تصفي البيانات، وتضم البيانات، وأكثر من ذلك بكثير. عادة، سيبدأ البرنامج النصي بمصدر واحد أو أكثر متبوعًا بالعديد من التحويلات وينتهي بواحد أو أكثر من المتلقين.

كل المصادر لديها البناء الأساسي نفسه:

source(
  source properties
) ~> source_name

على سبيل المثال، مصدر بسيط به ثلاثة أعمدة (معرّف الفيلم، والعنوان، والأنواع) سيكون:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

جميع التحويلات بخلاف المصادر التي تتميز بالبناء الأساسي نفسه:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

على سبيل المثال، تحويل مشتق بسيط يأخذ عمودًا (عنوانًا) ويكتب فوقه بإصدار أحرف كبيرة سيكون كما يلي:

source1 derive(
  title = upper(title)
) ~> derive1

يكون المتلقي بدون مخطط كما يلي:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

قصاصات برمجية من البرنامج النصي

القصاصات البرمجية من البرنامج النصي هي تعليمة برمجية قابلة للمشاركة من برنامج نصي لتدفق بيانات يمكنك استخدامها للمشاركة عبر تدفقات البيانات. يتحدث هذا الفيديو أدناه عن كيفية استخدام القصاصات البرمجية من البرنامج النصي واستخدام البرنامج النصي لتدفق البيانات لنسخ ولصق أجزاء من البرنامج النصي خلف الرسوم البيانية لتدفق البيانات:

ملخص الإحصائيات المجمعة

قم بإضافة تحويل مجمّع يسمى "SummaryStats" إلى تدفق البيانات الخاص بك، ثم قم بلصق هذه التعليمة البرمجية أدناه للدالة المجمّعة في برنامجك النصي، ما يؤدي إلى استبدال SummaryStats الموجود. وسيوفر ذلك نمطًا عامًّا لملخص إحصائيات ملف تعريف البيانات.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

يمكنك أيضاً استخدام العينة أدناه لحساب عدد الصفوف الفريدة وعدد الصفوف المميزة في بياناتك. يمكن لصق المثال أدناه في تدفق بيانات يشتمل على تحويل مجمع يُسمى ValueDistAgg. يستخدم هذا المثال عمود يُسمى "العنوان". تأكد من استبدال "العنوان" بعمود السلسلة في البيانات التي ترغب في استخدامها للحصول على أعداد القيم.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

تضمين كل الأعمدة في وحدة مجمّعة

هذا نمط عام للتجميع يوضح كيف يمكنك إبقاء الأعمدة المتبقية في بيانات تعريف الإخراج عند إنشاء التجميعات. في هذه الحال، نستخدم الدالة first() لاختيار القيمة الأولى في كل عمود لا يحمل اسم "فيلم". لاستخدام هذا، قم بإنشاء تحويل مجمّع يسمى DistinctRows ثم الصق هذا في البرنامج النصي الخاص بك فوق البرنامج النصي المجمّع DistinctRows الموجود.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

إنشاء بصمة تجزئة صف

استخدم هذه التعليمة البرمجية في برنامج نصي لتدفق البيانات لإنشاء عمود مشتق جديد يسمى DWhash والذي يؤدي إلى إنشاء تجزئة sha1 من ثلاثة أعمدة.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

يمكنك أيضاً استخدام هذا البرنامج النصي أدناه لإنشاء تجزئة صفوف باستخدام كل الأعمدة الموجودة في دفق البيانات، دون الحاجة إلى تسمية كل عمود:

derive(DWhash = sha1(columns())) ~> DWHash

مكافئ String_agg

ستعمل هذه التعليمات البرمجية مثل الدالة T-SQL string_agg() وستقوم بتجميع قيم السلسلة في صفيف. يمكنك بعد ذلك تحويل الصفيف إلى سلسلة لاستخدامها مع وجهات SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

إحصاء عدد التحديثات وعمليات الإدماج والإدراج والحذف

عند استخدام تحويل "الصف البديل"، قد تحتاج إلى حساب عدد التحديثات، وعمليات الإدماج والإدراج والحذف التي تنتج من نهج "الصف البديل". قم بإضافة تحويل مجمّع بعد صف بديل والصق هذا البرنامج النصي لتدفق البيانات في التعريف المجمّع لهذه الأعداد.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

صف مميز باستخدام كل الأعمدة

تضيف هذه القصاصة البرمجية تحويلاً مجمّعًا جديدًا إلى تدفق البيانات، يأخذ جميع الأعمدة الواردة وينشئ تجزئة تُستخدم للتجميع من أجل إزالة التكرارات، ثم يوفر أول حدوث لكل تكرار كإخراج. لست بحاجة إلى تسمية الأعمدة تسميةً صريحة، حيث تُنشأ تلقائياً من دفق البيانات الواردة.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

التحقق من وجود أحرف فارغة في كل الأعمدة

هذه عبارة عن قصاصة برمجية يمكنك لصقها في تدفق البيانات للتحقق بشكل عام من كل الأعمدة بحث عن قيم أحرف فارغة. تستفيد هذه التقنية من حركة المخطط للبحث خلال كل الأعمدة في كل الصفوف وتستخدم تقسيمًا مشروطًا لفصل الصفوف التي تشتمل على أحرف فارغة من الصفوف التي لا تشتمل عليها.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

التعيين التلقائي لحركة المخطط من خلال التحديد

عندما تحتاج إلى تحميل مخطط قاعدة بيانات موجود من مجموعة غير معروفة أو ديناميكية من الأعمدة الواردة، يجب تعيين أعمدة الجانب الأيمن في تحويل المتلقي. ويكون هذا مطلوباً فقط أثناء تحميل جدول موجود. أضف هذه القصاصة البرمجية قبل "المتلقي" لديك لإنشاء تحديد يقوم بتعيين الأعمدة تلقائيًّا. اترك تعيين "المتلقي" للتعيين التلقائي.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

أنواع بيانات الأعمدة المستمرة

قم بإضافة هذا البرنامج النصي داخل تعريف عمود مشتق لتخزين أسماء الأعمدة وأنواع البيانات من تدفق البيانات إلى مخزن مستمر باستخدام المتلقي.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

التعبئة للأسفل

إليك كيفية تنفيذ مشكلة "التعبئة للأسفل" الشائعة باستخدام مجموعات البيانات عندما تريد استبدال القيم الخالية بالقيمة المُستخلصة من القائمة غير الخالية السابقة في التسلسل. لاحظ أن هذه العملية يمكن أن تكون لها آثار سلبية على الأداء لأنه يجب عليك إنشاء نافذة صناعية عبر مجموعة البيانات الكاملة التي تشتمل على قيمة فئة عنصر "وهمي". بالإضافة إلى ذلك، يجب عليك الفرز حسب القيمة لإنشاء تسلسل البيانات المناسب للبحث عن القيمة السابقة التي لا تشتمل على أحرف فارغة. تقوم هذه القصاصة البرمجية أدناه بإنشاء الفئة الاصطناعية كعنصر "وهمي" وفرزها بواسطة مفتاح بديل. يمكنك إزالة المفتاح البديل واستخدام مفتاح الفرز الخاص بالبيانات لديك. تفترض القصاصة البرمجية من التعليمة البرمجية هذه أنك أضفت بالفعل تحويلاً مصدرًا يسمى source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

معدل النقل

يمكن تنفيذ معدل النقل بسهولة كبيرة في تدفقات البيانات باستخدام تحويل Windows. ينشئ هذا المثال أدناه معدل نقل لأسعار الأسهم لـ Microsoft لمدة 15 يومًا.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

العدد المميز لكل قيم الأعمدة

يمكنك استخدام هذا البرنامج النصي لتحديد أعمدة المفاتيح وعرض أهمية كل الأعمدة في دفق البيانات باستخدام قصاصة برمجية من برنامج نصي واحد. أضف هذا البرنامج النصي كتحويل مجمّع لتدفق البيانات لديك، وسوف يوفر تلقائيًّا الأعداد المميزة لكل الأعمدة.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

مقارنة قيم الصف السابقة أو التالية

يوضح نموذج القصاصة البرمجية هذا كيف يمكن استخدام تحويل النافذة لمقارنة قيم الأعمدة من سياق الصف الحالي بقيم الأعمدة من الصفوف قبل الصف الحالي وبعده. في هذا المثال، يتم استخدام عمود مشتق لإنشاء قيمة وهمية لتمكين قسم نافذة عبر مجموعة البيانات بأكملها. يتم استخدام تحويل مفتاح بديل لتعيين قيمة مفتاح فريدة لكلّ صف. عند تطبيق هذا النمط على تحويلات البيانات لديك، يمكنك إزالة المفتاح البديل إذا كان ما تريد الطلب باستخدامه هو عمود، كما يمكنك إزالة العمود المشتق إذا وُجدت لديك أعمدة يمكن استخدامها في تقسيم بياناتك.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

كم عدد الأعمدة الموجودة في بياناتي؟

size(array(columns()))

استكشاف تدفقات البيانات من خلال البدء بمقالة نظرة عامة على تدفقات البيانات