إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
توفر هذه المقالة أمثلة على التعليمات البرمجية وشرحا للمفاهيم الأساسية اللازمة لتشغيل استعلامات الدفق المنظم الأولى على Azure Databricks. يمكنك استخدام Structured Streaming لأحمال عمل المعالجة في الوقت الفعلي تقريبا والتزايدية.
الدفق المنظم هو واحد من العديد من التقنيات التي تعمل على تشغيل جداول الدفق في Delta Live Tables. توصي Databricks باستخدام Delta Live Tables لجميع أحمال عمل ETL والابتلاع والتدفق المنظم الجديدة. راجع ما هي جداول Delta Live؟.
إشعار
بينما توفر Delta Live Tables بناء جملة معدلا قليلا للإعلان عن جداول الدفق، ينطبق بناء الجملة العام لتكوين قراءات البث والتحويلات على جميع حالات استخدام الدفق على Azure Databricks. تعمل Delta Live Tables أيضا على تبسيط الدفق من خلال إدارة معلومات الحالة وبيانات التعريف والتكوينات العديدة.
استخدام "المحمل التلقائي" لقراءة البيانات المتدفقة من تخزين الكائنات
يوضح المثال التالي تحميل بيانات JSON مع Auto Loader، والذي يستخدم cloudFiles للإشارة إلى التنسيق والخيارات. schemaLocation يتيح الخيار استنتاج المخطط وتطوره. الصق التعليمات البرمجية التالية في خلية دفتر ملاحظات Databricks وقم بتشغيل الخلية لإنشاء DataFrame دفق باسم raw_df:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
مثل عمليات القراءة الأخرى على Azure Databricks، لا يؤدي تكوين قراءة الدفق إلى تحميل البيانات فعليا. يجب تشغيل إجراء على البيانات قبل بدء الدفق.
إشعار
يؤدي استدعاء display() DataFrame المتدفق إلى بدء مهمة دفق. بالنسبة لمعظم حالات استخدام Structured Streaming، يجب أن يكون الإجراء الذي يقوم بتشغيل الدفق هو كتابة البيانات إلى متلقي. راجع اعتبارات الإنتاج للبث المنظم.
إجراء تحويل دفق
يدعم Structured Streaming معظم التحويلات المتوفرة في Azure Databricks وSpark SQL. يمكنك حتى تحميل نماذج MLflow ك UDFs وإجراء تنبؤات الدفق كتحول.
يكمل مثال التعليمات البرمجية التالي تحويلا بسيطا لإثراء بيانات JSON التي تم استيعابها بمعلومات إضافية باستخدام وظائف Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
يحتوي الناتج transformed_df على إرشادات الاستعلام لتحميل كل سجل وتحويله عند وصوله إلى مصدر البيانات.
إشعار
يتعامل الدفق المنظم مع مصادر البيانات كمجموعات بيانات غير محدودة أو لانهائية. على هذا النحو، لا يتم دعم بعض التحويلات في أحمال عمل Structured Streaming لأنها تتطلب فرز عدد لا نهائي من العناصر.
تتطلب معظم التجميعات والعديد من الصلات إدارة معلومات الحالة باستخدام العلامات المائية والنوافذ ووضع الإخراج. راجع تطبيق العلامات المائية للتحكم في حدود معالجة البيانات.
تنفيذ كتابة دفعة تزايدية إلى Delta Lake
يكتب المثال التالي إلى Delta Lake باستخدام مسار ملف محدد ونقطة تحقق.
هام
تأكد دائما من تحديد موقع نقطة تحقق فريد لكل كاتب تدفق تقوم بتكوينه. توفر نقطة التحقق الهوية الفريدة للتدفق الخاص بك، وتتبع جميع السجلات التي تمت معالجتها ومعلومات الحالة المرتبطة باستعلام البث.
availableNow يرشد إعداد المشغل Structured Streaming لمعالجة جميع السجلات غير المعالجة مسبقا من مجموعة البيانات المصدر ثم إيقاف التشغيل، حتى تتمكن من تنفيذ التعليمات البرمجية التالية بأمان دون القلق بشأن ترك دفق قيد التشغيل:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
في هذا المثال، لا تصل أي سجلات جديدة إلى مصدر البيانات الخاص بنا، لذلك لا يستبدل تكرار تنفيذ هذه التعليمة البرمجية السجلات الجديدة.
تحذير
يمكن أن يمنع تنفيذ الدفق المنظم الإنهاء التلقائي من إيقاف تشغيل موارد الحوسبة. لتجنب التكاليف غير المتوقعة، تأكد من إنهاء استعلامات الدفق.
قراءة البيانات من Delta Lake وتحويلها وكتابتها إلى Delta Lake
يتمتع Delta Lake بدعم واسع للعمل مع Structured Streaming كمصدر ومتلقي. راجع قراءات وكتابات دفق جدول Delta.
يوضح المثال التالي بناء جملة المثال لتحميل كافة السجلات الجديدة بشكل متزايد من جدول Delta، وربطها بلقطة لجدول Delta آخر، وكتابتها في جدول Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
يجب أن يكون لديك أذونات مناسبة تم تكوينها لقراءة جداول المصدر والكتابة إلى الجداول الهدف وموقع نقطة التحقق المحددة. املأ جميع المعلمات التي تمت الإشارة إليها بأقواس زاوية (<>) باستخدام القيم ذات الصلة لمصادر البيانات والمتلقين.
إشعار
توفر Delta Live Tables بناء جملة تعريفيا بالكامل لإنشاء مسارات Delta Lake وتدير خصائص مثل المشغلات ونقاط التحقق تلقائيا. راجع ما هي جداول Delta Live؟.
قراءة البيانات من Kafka وتحويلها وكتابتها إلى Kafka
يوفر Apache Kafka وحافلات المراسلة الأخرى بعض أقل زمن انتقال متاح لمجموعات البيانات الكبيرة. يمكنك استخدام Azure Databricks لتطبيق التحويلات على البيانات التي تم استيعابها من Kafka ثم كتابة البيانات مرة أخرى إلى Kafka.
إشعار
تضيف كتابة البيانات إلى تخزين الكائنات السحابية مقدارا إضافيا من زمن الانتقال. إذا كنت ترغب في تخزين البيانات من ناقل مراسلة في Delta Lake ولكنك تحتاج إلى أقل زمن انتقال ممكن لأحمال العمل المتدفقة، توصي Databricks بتكوين مهام دفق منفصلة لاستيعاب البيانات إلى lakehouse وتطبيق تحويلات قريبة من الوقت الحقيقي لأحواض ناقل المراسلة النهائية.
يوضح مثال التعليمات البرمجية التالي نمطا بسيطا لإثراء البيانات من Kafka عن طريق ضمها مع البيانات في جدول Delta ثم الكتابة مرة أخرى إلى Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
يجب أن يكون لديك أذونات مناسبة تم تكوينها للوصول إلى خدمة Kafka. املأ جميع المعلمات التي تمت الإشارة إليها بأقواس زاوية (<>) باستخدام القيم ذات الصلة لمصادر البيانات والمتلقين. راجع معالجة الدفق باستخدام Apache Kafka وAzure Databricks.