إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على:
Databricks SQL
Databricks Runtime 13.3 LTS وما فوق
إرجاع جدول يحتوي على سجلات مقروءة من Pub/Sub من موضوع. يدعم الاستعلامات المتدفقة فقط.
بناء الجملة
read_pubsub( { parameter => value } [, ...])
الوسيطات
read_pubsub يتطلب استدعاء المعلمة المسماة.
الوسيطات المطلوبة الوحيدة هي subscriptionIdو projectIdو.topicId جميع الوسيطات الأخرى اختيارية.
للحصول على أوصاف كاملة للوسيطات، راجع تكوين خيارات قراءة بث Pub/Sub.
توصي Databricks باستخدام الأسرار عند توفير خيارات التخويل. راجع الدالة السرية.
للحصول على تفاصيل حول تكوين الوصول إلى Pub/Sub، راجع تكوين الوصول إلى Pub/Sub.
| المعلمة | النوع | الوصف |
|---|---|---|
subscriptionId |
STRING |
مطلوب، المعرف الفريد المعين لاشتراك Pub/Sub. |
projectId |
STRING |
مطلوب، معرف مشروع Google Cloud المرتبط بموضوع Pub/Sub. |
topicId |
STRING |
مطلوب، معرف أو اسم موضوع Pub/Sub للاشتراك فيه. |
clientEmail |
STRING |
عنوان البريد الإلكتروني المقترن بحساب خدمة للمصادقة. |
clientId |
STRING |
معرف العميل المقترن بحساب الخدمة للمصادقة. |
privateKeyId |
STRING |
معرف المفتاح الخاص المقترن بحساب الخدمة. |
privateKey |
STRING |
المفتاح الخاص المقترن بحساب الخدمة للمصادقة. |
تستخدم هذه الوسيطات لمزيد من الضبط الدقيق عند القراءة من Pub/Sub:
| المعلمة | النوع | الوصف |
|---|---|---|
numFetchPartitions |
STRING |
اختياري مع العدد الافتراضي للمنفذين. عدد مهام Spark المتوازية التي تجلب السجلات من اشتراك. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
اختياري مع الافتراضي false. إذا تم تعيينه إلى صحيح، يتم حذف الاشتراك الذي تم تمريره إلى الدفق عند انتهاء مهمة الدفق. |
maxBytesPerTrigger |
STRING |
حد بسيط لحجم الدفعة المراد معالجته أثناء كل دفعة صغيرة يتم تشغيلها. الإعداد الافتراضي هو "none". |
maxRecordsPerFetch |
STRING |
عدد السجلات التي يجب إحضارها لكل مهمة قبل معالجة السجلات. الافتراضي هو "1000". |
maxFetchPeriod |
STRING |
المدة الزمنية لكل مهمة لإحضارها قبل معالجة السجلات. الإعداد الافتراضي هو "10s". |
المرتجعات
جدول سجلات Pub/Sub مع المخطط التالي. قد يكون عمود السمات فارغا ولكن كافة الأعمدة الأخرى ليست خالية.
| الاسم | نوع البيانات | بدون قيمة | قياسي | الوصف |
|---|---|---|---|---|
messageId |
STRING |
لا | معرف فريد لرسالة Pub/Sub. | |
payload |
BINARY |
لا | محتوى الرسالة Pub/Sub. | |
attributes |
STRING |
نعم | أزواج قيم المفاتيح التي تمثل سمات رسالة Pub/Sub. هذه سلسلة مرمزة ب json. | |
publishTimestampInMillis |
BIGINT |
لا | الطابع الزمني عند نشر الرسالة، بالمللي ثانية. | |
sequenceNumber |
BIGINT |
لا | المعرف الفريد للسجل داخل الجزء الخاص به. |
الأمثلة
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
وينبغي الآن الاستعلام عن البيانات من أجل إجراء مزيد من testing.streaming_table التحليل.
استعلامات خاطئة:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);