مشاركة عبر


read_pulsar دفق دالة قيم الجدول

ينطبق على: وضع علامة Databricks SQL وضع علامة Databricks Runtime 14.1 وما فوق

هام

هذه الميزة في المعاينة العامة.

إرجاع جدول مع قراءة السجلات من Pulsar.

تدعم هذه الدالة ذات القيمة الجدولية البث فقط وليس الاستعلام الدفعي.

بناء الجملة

read_pulsar ( { option_key => option_value } [, ...] )

الوسيطات

تتطلب هذه الدالة استدعاء معلمة مسماة لمفاتيح الخيارات.

الخيارات serviceUrl و topic إلزامية.

أوصاف الوسيطات موجزة هنا. راجع وثائق Pulsar المتدفقة المنظمة للحصول على أوصاف موسعة.

خيار نوع Default ‏‏الوصف
serviceUrl سلسلة إلزامي URI لخدمة Pulsar.
الموضوع سلسلة إلزامي الموضوع للقراءة منه.
الاشتراك المحدد مسبقا سلسلة بلا اسم الاشتراك المحدد مسبقا المستخدم من قبل الموصل لتتبع تقدم تطبيق spark.
subscriptionPrefix سلسلة بلا بادئة يستخدمها الموصل لإنشاء اشتراك عشوائي لتتبع تقدم تطبيق spark.
مهلة الاستقصاء LONG 120000 مهلة قراءة الرسائل من Pulsar بالمللي ثانية.
failOnDataLoss BOOLEAN صحيح يتحكم في فشل استعلام عند فقدان البيانات (على سبيل المثال، يتم حذف الموضوعات أو حذف الرسائل بسبب نهج الاستبقاء).
بدء تشغيل مجموعات سلسلة الأحدث نقطة البدء عند بدء تشغيل استعلام، إما أقدم أو أحدث أو سلسلة JSON تحدد إزاحة معينة. إذا كان الأحدث، يقرأ القارئ أحدث السجلات بعد بدء تشغيله. إذا كان أقرب، يقرأ القارئ من أقرب إزاحة. يمكن للمستخدم أيضا تحديد سلسلة JSON التي تحدد إزاحة معينة.
وقت البدء سلسلة بلا عند التحديد، سيقرأ مصدر Pulsar الرسائل بدءا من موضع وقت البدء المحدد.

يتم استخدام الوسيطات التالية لمصادقة عميل pulsar:

خيار نوع Default ‏‏الوصف
pulsarClientAuthPluginClassName سلسلة بلا اسم المكون الإضافي للمصادقة.
pulsarClientAuthParams سلسلة بلا معلمات المكون الإضافي للمصادقة.
pulsarClientUseKeyStoreTls سلسلة بلا ما إذا كان يجب استخدام KeyStore لمصادقة tls.
pulsarClientTlsTrustStoreType سلسلة بلا نوع ملف TrustStore لمصادقة tls.
pulsarClientTlsTrustStorePath سلسلة بلا مسار ملف TrustStore لمصادقة tls.
pulsarClientTlsTrustStorePassword سلسلة بلا كلمة مرور TrustStore لمصادقة tls.

يتم استخدام هذه الوسيطات لتكوين ومصادقة التحكم في قبول pulsar، تكوين مسؤول pulsar مطلوب فقط عند تمكين التحكم في القبول (عند تعيين maxBytesPerTrigger)

خيار نوع Default ‏‏الوصف
maxBytesPerTrigger BIGINT بلا حد بسيط للحد الأقصى لعدد وحدات البايت التي نريد معالجتها لكل ميكروباتش. إذا تم تحديد ذلك، يجب أيضا تحديد admin.url.
adminUrl سلسلة بلا تكوين خدمة PulsarHttpUrl. مطلوب فقط عند تحديد maxBytesPerTrigger.
pulsarAdminAuthPlugin سلسلة بلا اسم المكون الإضافي للمصادقة.
pulsarAdminAuthParams سلسلة بلا معلمات المكون الإضافي للمصادقة.
pulsarClientUseKeyStoreTls سلسلة بلا ما إذا كان يجب استخدام KeyStore لمصادقة tls.
pulsarAdminTlsTrustStoreType سلسلة بلا نوع ملف TrustStore لمصادقة tls.
pulsarAdminTlsTrustStorePath سلسلة بلا مسار ملف TrustStore لمصادقة tls.
pulsarAdminTlsTrustStorePassword سلسلة بلا كلمة مرور TrustStore لمصادقة tls.

المرتجعات

جدول سجلات pulsar مع المخطط التالي.

  • __key STRING NOT NULL: مفتاح رسالة Pulsar.

  • value BINARY NOT NULL: قيمة رسالة Pulsar.

    ملاحظة: بالنسبة للمواضيع ذات مخطط Avro أو JSON، بدلا من تحميل المحتوى في حقل قيمة ثنائية، سيتم توسيع المحتوى للحفاظ على أسماء الحقول وأنواع الحقول لموضوع Pulsar.

  • __topic STRING NOT NULL: اسم موضوع Pulsar.

  • __messageId BINARY NOT NULL: معرف رسالة Pulsar.

  • __publishTime TIMESTAMP NOT NULL: وقت نشر رسالة Pulsar.

  • __eventTime TIMESTAMP NOT NULL: وقت حدث رسالة Pulsar.

  • __messageProperties MAP<STRING, STRING>: خصائص رسالة Pulsar.

الأمثلة

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.