إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
في Databricks Runtime 14.1 وما فوق، يمكنك استخدام Structured Streaming لدفق البيانات من Apache Pulsar على Azure Databricks.
يوفر الدفق المنظم دلالات معالجة مرة واحدة بالضبط للبيانات المقروءة من مصادر Pulsar.
مثال على بناء الجملة
فيما يلي مثال أساسي لاستخدام Structured Streaming للقراءة من Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
يجب عليك دائما توفير service.url و أحد الخيارات التالية لتحديد الموضوعات:
topictopicstopicsPattern
للحصول على قائمة كاملة بالخيارات، راجع تكوين خيارات قراءة تدفق Pulsar.
المصادقة على Pulsar
يدعم Azure Databricks مصادقة مخزن الثقة ومخزن المفاتيح إلى Pulsar. توصي Databricks باستخدام الأسرار عند تخزين تفاصيل التكوين.
يمكنك تعيين الخيارات التالية أثناء تكوين الدفق:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
إذا كان الدفق يستخدم PulsarAdmin، فقم أيضا بتعيين ما يلي:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
يوضح المثال التالي تكوين خيارات المصادقة:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
مخطط Pulsar
يعتمد مخطط السجلات المقروءة من Pulsar على كيفية ترميز الموضوعات لمخططاتها.
- بالنسبة للمواضيع ذات مخطط Avro أو JSON، يتم الاحتفاظ بأسماء الحقول وأنواع الحقول في Spark DataFrame الناتج.
- بالنسبة للمواضيع التي لا تحتوي على مخطط أو بنوع بيانات بسيط في Pulsar، يتم تحميل الحمولة إلى
valueعمود. - إذا تم تكوين القارئ لقراءة مواضيع متعددة بمخططات مختلفة، فقم بتعيين
allowDifferentTopicSchemasلتحميل المحتوى الخام إلىvalueعمود.
تحتوي سجلات Pulsar على حقول بيانات التعريف التالية:
| Column | النوع |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
تكوين خيارات لقراءة تدفق Pulsar
يتم تكوين جميع الخيارات كجزء من قراءة Structured Streaming باستخدام .option("<optionName>", "<optionValue>") بناء الجملة. يمكنك أيضا تكوين المصادقة باستخدام الخيارات. راجع المصادقة على Pulsar.
يصف الجدول التالي التكوينات المطلوبة ل Pulsar. يجب تحديد خيار واحد فقط أو topictopics topicsPattern.
| خيار | القيمة الافتراضية | الوصف |
|---|---|---|
service.url |
لا شيء | تكوين Pulsar serviceUrl لخدمة Pulsar. |
topic |
لا شيء | سلسلة اسم موضوع ليستهلكها الموضوع. |
topics |
لا شيء | قائمة مفصولة بفواصل للمواضيع التي يجب استهلاكها. |
topicsPattern |
لا شيء | سلسلة Java regex لتتطابق مع الموضوعات التي يجب استهلاكها. |
يصف الجدول التالي الخيارات الأخرى المدعومة ل Pulsar:
| خيار | القيمة الافتراضية | الوصف |
|---|---|---|
predefinedSubscription |
لا شيء | اسم الاشتراك المحدد مسبقا المستخدم من قبل الموصل لتتبع تقدم تطبيق spark. |
subscriptionPrefix |
لا شيء | بادئة يستخدمها الموصل لإنشاء اشتراك عشوائي لتتبع تقدم تطبيق spark. |
pollTimeoutMs |
120000 | مهلة قراءة الرسائل من Pulsar بالمللي ثانية. |
waitingForNonExistedTopic |
false |
ما إذا كان يجب أن ينتظر الموصل حتى يتم إنشاء الموضوعات المطلوبة. |
failOnDataLoss |
true |
يتحكم في فشل استعلام عند فقدان البيانات (على سبيل المثال، يتم حذف الموضوعات أو حذف الرسائل بسبب نهج الاستبقاء). |
allowDifferentTopicSchemas |
false |
إذا تم قراءة مواضيع متعددة ذات مخططات مختلفة، فاستخدم هذه المعلمة لإيقاف تشغيل إلغاء التسلسل التلقائي لقيمة الموضوع المستندة إلى المخطط. يتم إرجاع القيم الأولية فقط عندما يكون هذا هو true. |
startingOffsets |
latest |
إذا latest، يقرأ القارئ أحدث السجلات بعد بدء تشغيله. إذا earliest، يقرأ القارئ من أقرب إزاحة. يمكن للمستخدم أيضا تحديد سلسلة JSON التي تحدد إزاحة معينة. |
maxBytesPerTrigger |
لا شيء | حد بسيط للحد الأقصى لعدد وحدات البايت التي نريد معالجتها لكل ميكروباتش. إذا تم تحديد ذلك، admin.url يجب أيضا تحديده. |
admin.url |
لا شيء | تكوين Pulsar serviceHttpUrl . مطلوب فقط عند maxBytesPerTrigger تحديد. |
يمكنك أيضا تحديد أي تكوينات عميل ومسؤول وقارئ Pulsar باستخدام الأنماط التالية:
| النمط | ارتباط بخيارات التكوين |
|---|---|
pulsar.client.* |
تكوين عميل Pulsar |
pulsar.admin.* |
تكوين مسؤول Pulsar |
pulsar.reader.* |
تكوين قارئ Pulsar |
إنشاء إزاحات بدء تشغيل JSON
يمكنك إنشاء معرف رسالة يدويا لتحديد إزاحة معينة وتمرير هذا ك JSON إلى startingOffsets الخيار . يوضح مثال التعليمات البرمجية التالي بناء الجملة هذا:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()