مشاركة عبر


read_pubsub دفق دالة قيم الجدول

ينطبق على: وضع علامة Databricks SQL وضع علامة Databricks Runtime 13.3 LTS وما فوق

إرجاع جدول يحتوي على سجلات مقروءة من Pub/Sub من موضوع. يدعم الاستعلامات المتدفقة فقط.

بناء الجملة

read_pubsub( { parameter => value } [, ...])

الوسيطات

read_pubsub يتطلب استدعاء المعلمة المسماة.

الوسيطات المطلوبة الوحيدة هي subscriptionIdو projectIdو.topicId جميع الوسيطات الأخرى اختيارية.

للحصول على أوصاف كاملة للوسيطات، راجع تكوين خيارات قراءة بث Pub/Sub.

توصي Databricks باستخدام الأسرار عند توفير خيارات التخويل. راجع الدالة السرية.

للحصول على تفاصيل حول تكوين الوصول إلى Pub/Sub، راجع تكوين الوصول إلى Pub/Sub.

المعلمة النوع ‏‏الوصف
subscriptionId STRING مطلوب، المعرف الفريد المعين لاشتراك Pub/Sub.
projectId STRING مطلوب، معرف مشروع Google Cloud المرتبط بموضوع Pub/Sub.
topicId STRING مطلوب، معرف أو اسم موضوع Pub/Sub للاشتراك فيه.
clientEmail STRING عنوان البريد الإلكتروني المقترن بحساب خدمة للمصادقة.
clientId STRING معرف العميل المقترن بحساب الخدمة للمصادقة.
privateKeyId STRING معرف المفتاح الخاص المقترن بحساب الخدمة.
privateKey STRING المفتاح الخاص المقترن بحساب الخدمة للمصادقة.

تستخدم هذه الوسيطات لمزيد من الضبط الدقيق عند القراءة من Pub/Sub:

المعلمة النوع ‏‏الوصف
numFetchPartitions STRING اختياري مع العدد الافتراضي للمنفذين. عدد مهام Spark المتوازية التي تجلب السجلات من اشتراك.
deleteSubscriptionOnStreamStop BOOLEAN اختياري مع الافتراضي false. إذا تم تعيينه إلى صحيح، يتم حذف الاشتراك الذي تم تمريره إلى الدفق عند انتهاء مهمة الدفق.
maxBytesPerTrigger STRING حد بسيط لحجم الدفعة المراد معالجته أثناء كل دفعة صغيرة يتم تشغيلها. الإعداد الافتراضي هو "none".
maxRecordsPerFetch STRING عدد السجلات التي يجب إحضارها لكل مهمة قبل معالجة السجلات. الافتراضي هو "1000".
maxFetchPeriod STRING المدة الزمنية لكل مهمة لإحضارها قبل معالجة السجلات. الإعداد الافتراضي هو "10s".

المرتجعات

جدول سجلات Pub/Sub مع المخطط التالي. قد يكون عمود السمات فارغا ولكن كافة الأعمدة الأخرى ليست خالية.

الاسم نوع البيانات بدون قيمة قياسي ‏‏الوصف
messageId STRING لا معرف فريد لرسالة Pub/Sub.
payload BINARY لا محتوى الرسالة Pub/Sub.
attributes STRING ‏‏نعم‬ أزواج قيم المفاتيح التي تمثل سمات رسالة Pub/Sub. هذه سلسلة مرمزة ب json.
publishTimestampInMillis BIGINT لا الطابع الزمني عند نشر الرسالة، بالمللي ثانية.
sequenceNumber BIGINT لا المعرف الفريد للسجل داخل الجزء الخاص به.

الأمثلة

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

وينبغي الآن الاستعلام عن البيانات من أجل إجراء مزيد من testing.streaming_table التحليل.

استعلامات خاطئة:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);