إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على:
Databricks SQL
Databricks Runtime 14.1 وما فوق
إرجاع جدول مع قراءة السجلات من Pulsar.
تدعم هذه الدالة ذات القيمة الجدولية البث فقط وليس الاستعلام الدفعي.
بناء الجملة
read_pulsar ( { option_key => option_value } [, ...] )
الوسيطات
تتطلب هذه الدالة استدعاء معلمة مسماة لمفاتيح الخيارات.
الخيارات serviceUrl و topic إلزامية.
أوصاف الوسيطات موجزة هنا. راجع وثائق Pulsar المتدفقة المنظمة للحصول على أوصاف موسعة.
| خيار | نوع | Default | الوصف |
|---|---|---|---|
| serviceUrl | سلسلة | إلزامي | URI لخدمة Pulsar. |
| الموضوع | سلسلة | إلزامي | الموضوع للقراءة منه. |
| الاشتراك المحدد مسبقا | سلسلة | بلا | اسم الاشتراك المحدد مسبقا المستخدم من قبل الموصل لتتبع تقدم تطبيق spark. |
| subscriptionPrefix | سلسلة | بلا | بادئة يستخدمها الموصل لإنشاء اشتراك عشوائي لتتبع تقدم تطبيق spark. |
| مهلة الاستقصاء | LONG | 120000 | مهلة قراءة الرسائل من Pulsar بالمللي ثانية. |
| failOnDataLoss | BOOLEAN | صحيح | يتحكم في فشل استعلام عند فقدان البيانات (على سبيل المثال، يتم حذف الموضوعات أو حذف الرسائل بسبب نهج الاستبقاء). |
| بدء تشغيل مجموعات | سلسلة | الأحدث | نقطة البدء عند بدء تشغيل استعلام، إما أقدم أو أحدث أو سلسلة JSON تحدد إزاحة معينة. إذا كان الأحدث، يقرأ القارئ أحدث السجلات بعد بدء تشغيله. إذا كان أقرب، يقرأ القارئ من أقرب إزاحة. يمكن للمستخدم أيضا تحديد سلسلة JSON التي تحدد إزاحة معينة. |
| وقت البدء | سلسلة | بلا | عند التحديد، سيقرأ مصدر Pulsar الرسائل بدءا من موضع وقت البدء المحدد. |
يتم استخدام الوسيطات التالية لمصادقة عميل pulsar:
| خيار | نوع | Default | الوصف |
|---|---|---|---|
| pulsarClientAuthPluginClassName | سلسلة | بلا | اسم المكون الإضافي للمصادقة. |
| pulsarClientAuthParams | سلسلة | بلا | معلمات المكون الإضافي للمصادقة. |
| pulsarClientUseKeyStoreTls | سلسلة | بلا | ما إذا كان يجب استخدام KeyStore لمصادقة tls. |
| pulsarClientTlsTrustStoreType | سلسلة | بلا | نوع ملف TrustStore لمصادقة tls. |
| pulsarClientTlsTrustStorePath | سلسلة | بلا | مسار ملف TrustStore لمصادقة tls. |
| pulsarClientTlsTrustStorePassword | سلسلة | بلا | كلمة مرور TrustStore لمصادقة tls. |
يتم استخدام هذه الوسيطات لتكوين ومصادقة التحكم في قبول pulsar، تكوين مسؤول pulsar مطلوب فقط عند تمكين التحكم في القبول (عند تعيين maxBytesPerTrigger)
| خيار | نوع | Default | الوصف |
|---|---|---|---|
| maxBytesPerTrigger | BIGINT | بلا | حد بسيط للحد الأقصى لعدد وحدات البايت التي نريد معالجتها لكل ميكروباتش. إذا تم تحديد ذلك، يجب أيضا تحديد admin.url. |
| adminUrl | سلسلة | بلا | تكوين خدمة PulsarHttpUrl. مطلوب فقط عند تحديد maxBytesPerTrigger. |
| pulsarAdminAuthPlugin | سلسلة | بلا | اسم المكون الإضافي للمصادقة. |
| pulsarAdminAuthParams | سلسلة | بلا | معلمات المكون الإضافي للمصادقة. |
| pulsarClientUseKeyStoreTls | سلسلة | بلا | ما إذا كان يجب استخدام KeyStore لمصادقة tls. |
| pulsarAdminTlsTrustStoreType | سلسلة | بلا | نوع ملف TrustStore لمصادقة tls. |
| pulsarAdminTlsTrustStorePath | سلسلة | بلا | مسار ملف TrustStore لمصادقة tls. |
| pulsarAdminTlsTrustStorePassword | سلسلة | بلا | كلمة مرور TrustStore لمصادقة tls. |
المرتجعات
جدول سجلات pulsar مع المخطط التالي.
__key STRING NOT NULL: مفتاح رسالة Pulsar.value BINARY NOT NULL: قيمة رسالة Pulsar.ملاحظة: بالنسبة للمواضيع ذات مخطط Avro أو JSON، بدلا من تحميل المحتوى في حقل قيمة ثنائية، سيتم توسيع المحتوى للحفاظ على أسماء الحقول وأنواع الحقول لموضوع Pulsar.
__topic STRING NOT NULL: اسم موضوع Pulsar.__messageId BINARY NOT NULL: معرف رسالة Pulsar.__publishTime TIMESTAMP NOT NULL: وقت نشر رسالة Pulsar.__eventTime TIMESTAMP NOT NULL: وقت حدث رسالة Pulsar.__messageProperties MAP<STRING, STRING>: خصائص رسالة Pulsar.
الأمثلة
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.