read_kinesis دفق دالة قيم الجدول

ينطبق على: وضع علامة Databricks SQL وضع علامة Databricks Runtime 13.3 LTS وما فوق

إرجاع جدول يحتوي على سجلات مقروءة من Kinesis من دفق واحد أو أكثر.

بناء الجملة

read_kinesis ( { parameter => value } [, ...] )

الوسيطات

read_kinesis يتطلب استدعاء المعلمة المسماة.

الوسيطة المطلوبة الوحيدة هي streamName. جميع الوسيطات الأخرى اختيارية.

أوصاف الوسيطات موجزة هنا. لمزيد من التفاصيل، راجع وثائق Amazon Kinesis .

هناك خيارات اتصال مختلفة للاتصال والمصادقة مع AWS. awsAccessKey، ويمكن تحديدها awsSecretKey في وسيطات الدالة باستخدام الدالة السرية، أو تعيينها يدويا في الوسيطات، أو تكوينها كمتغيرات بيئة كما هو موضح أدناه. roleArnroleSessionName، roleExternalIDيمكن أيضا استخدام للمصادقة مع AWS باستخدام ملفات تعريف المثيل. إذا لم يتم تحديد أي من هذه، فسيستخدم سلسلة موفر AWS الافتراضية.

المعلمة النوع ‏‏الوصف
streamName STRING مطلوب، قائمة مفصولة بفواصل لواحد أو أكثر من تدفقات kinesis.
awsAccessKey STRING مفتاح الوصول إلى AWS، إن وجد. يمكن أيضا تحديد من خلال الخيارات المختلفة المدعومة من خلال سلسلة موفر بيانات الاعتماد الافتراضية AWS بما في ذلك متغيرات البيئة (AWS_ACCESS_KEY_ID) وملف ملفات تعريف بيانات الاعتماد.
awsSecretKey STRING المفتاح السري الذي يتوافق مع مفتاح الوصول. يمكن تحديد إما في الوسيطات أو من خلال الخيارات المختلفة المدعومة من خلال سلسلة موفر بيانات الاعتماد الافتراضية AWS بما في ذلك متغيرات البيئة (AWS_SECRET_KEY أو AWS_SECRET_ACCESS_KEY) وملف ملفات تعريف بيانات الاعتماد.
roleArn STRING اسم مورد Amazon للدور الذي يجب أن تفترضه عند الوصول إلى Kinesis.
roleExternalId STRING يستخدم عند تفويض الوصول إلى حساب AWS.
roleSessionName STRING اسم جلسة دور AWS.
stsEndpoint STRING نقطة نهاية لطلب بيانات اعتماد الوصول المؤقتة.
region STRING منطقة للتدفقات التي سيتم تحديدها. الإعداد الافتراضي هو المنطقة التي تم حلها محليا.
endpoint STRING نقطة النهاية الإقليمية لتدفقات بيانات Kinesis. الإعداد الافتراضي هو المنطقة التي تم حلها محليا.
initialPosition STRING موضع البدء للقراءة من في الدفق. واحد من: "الأحدث" (افتراضي)، "trim_horizon"، "الأقدم"، "at_timestamp".
consumerMode STRING واحد من: "الاستقصاء" (افتراضي)، أو "EFO" (محسن-fan-out).
consumerName STRING اسم المستهلك. جميع المستهلكين مسبوقة ب "databricks_". الافتراضي هو سلسلة فارغة.
registerConsumerTimeoutInterval STRING الحد الأقصى لمهلة الانتظار حتى يتم تسجيل مستهلك Kinesis EFO مع دفق Kinesis قبل طرح خطأ. الإعداد الافتراضي هو "300s".
requireConsumerDeregistration BOOLEAN true لإلغاء تسجيل مستهلك EFO عند إنهاء الاستعلام. القيمة الافتراضية هي false.
deregisterConsumerTimeoutInterval STRING الحد الأقصى لمهلة الانتظار حتى يتم إلغاء تسجيل مستهلك Kinesis EFO مع دفق Kinesis قبل طرح خطأ. الإعداد الافتراضي هو "300s".
consumerRefreshInterval STRING الفاصل الزمني الذي يتم فيه فحص المستهلك وتحديثه. الإعداد الافتراضي هو "300s".

يتم استخدام الوسيطات التالية للتحكم في معدل نقل القراءة وزمن الانتقال ل Kinesis:

المعلمة النوع ‏‏الوصف
maxRecordsPerFetch INTEGER (>0) اختياري، مع افتراضي 10000 سجل للقراءة لكل طلب واجهة برمجة تطبيقات إلى Kinesis.
maxFetchRate STRING مدى سرعة الإحضار المسبق للبيانات لكل جزء. قيمة بين "1.0" و"2.0" يتم قياسها بالميغابايت/ثانية. الإعداد الافتراضي هو "1.0".
minFetchPeriod STRING الحد الأقصى لوقت الانتظار بين محاولات الجلب المسبق المتتالية. الإعداد الافتراضي هو "400 مللي ثانية".
maxFetchDuration STRING الحد الأقصى لمدة تخزين البيانات الجديدة المحفوفة مسبقا مؤقتا. الإعداد الافتراضي هو "10s".
fetchBufferSize STRING كمية البيانات للمشغل التالي. الإعداد الافتراضي هو '20 غيغابايت'.
shardsPerTask INTEGER (>0) عدد أجزاء Kinesis التي يتم الإحضار المسبق منها بالتوازي لكل مهمة spark. القيمة الافتراضية هي 5.
shardFetchinterval STRING عدد المرات التي يتم فيها الاستقصاء عن إعادة التقسيم. الإعداد الافتراضي هو '1s'.
coalesceThresholdBlockSize INTEGER (>0) الحد الذي يحدث عنده الاندماج التلقائي. الافتراضي هو 10,000,000.
coalesce BOOLEAN true لدمج الطلبات مسبقة الجلب. الافتراضي هو true.
coalesceBinSize INTEGER (>0) حجم الكتلة التقريبي بعد الاندماج. الافتراضي هو 128,000,000.
reuseKinesisClient BOOLEAN true لإعادة استخدام عميل Kinesis المخزن في ذاكرة التخزين المؤقت. الإعداد الافتراضي هو true باستثناء مجموعة PE.
clientRetries INTEGER (>0) عدد مرات إعادة المحاولة في سيناريو إعادة المحاولة. القيمة الافتراضية هي 5.

المرتجعات

جدول سجلات Kinesis مع المخطط التالي:

الاسم نوع البيانات بدون قيمة قياسي ‏‏الوصف
partitionKey STRING لا مفتاح يستخدم لتوزيع البيانات بين أجزاء الدفق. ستتم قراءة جميع سجلات البيانات بنفس مفتاح القسم من نفس الجزء.
data BINARY لا حمولة بيانات kinesis، مرمزة ب base-64.
stream STRING لا اسم الدفق الذي تمت قراءة البيانات منه.
shardId STRING لا معرف فريد للجزء الذي تمت قراءة البيانات منه.
sequenceNumber BIGINT لا المعرف الفريد للسجل داخل الجزء الخاص به.
approximateArrivalTimestamp TIMESTAMP لا الوقت التقريبي الذي تم فيه إدراج السجل في الدفق.

تشكل الأعمدة (stream, shardId, sequenceNumber) مفتاحا أساسيا.

الأمثلة

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');