read_kinesis
دفق دالة قيم الجدول
ينطبق على: Databricks SQL Databricks Runtime 13.3 LTS وما فوق
إرجاع جدول يحتوي على سجلات مقروءة من Kinesis من دفق واحد أو أكثر.
بناء الجملة
read_kinesis ( { parameter => value } [, ...] )
الوسيطات
read_kinesis
يتطلب استدعاء المعلمة المسماة.
الوسيطة المطلوبة الوحيدة هي streamName
. جميع الوسيطات الأخرى اختيارية.
أوصاف الوسيطات موجزة هنا. لمزيد من التفاصيل، راجع وثائق Amazon Kinesis .
هناك خيارات اتصال مختلفة للاتصال والمصادقة مع AWS.
awsAccessKey
، ويمكن تحديدها awsSecretKey
في وسيطات الدالة باستخدام الدالة السرية، أو تعيينها يدويا في الوسيطات، أو تكوينها كمتغيرات بيئة كما هو موضح أدناه.
roleArn
roleSessionName
، roleExternalID
يمكن أيضا استخدام للمصادقة مع AWS باستخدام ملفات تعريف المثيل.
إذا لم يتم تحديد أي من هذه، فسيستخدم سلسلة موفر AWS الافتراضية.
المعلمة | النوع | الوصف |
---|---|---|
streamName |
STRING |
مطلوب، قائمة مفصولة بفواصل لواحد أو أكثر من تدفقات kinesis. |
awsAccessKey |
STRING |
مفتاح الوصول إلى AWS، إن وجد. يمكن أيضا تحديد من خلال الخيارات المختلفة المدعومة من خلال سلسلة موفر بيانات الاعتماد الافتراضية AWS بما في ذلك متغيرات البيئة (AWS_ACCESS_KEY_ID ) وملف ملفات تعريف بيانات الاعتماد. |
awsSecretKey |
STRING |
المفتاح السري الذي يتوافق مع مفتاح الوصول. يمكن تحديد إما في الوسيطات أو من خلال الخيارات المختلفة المدعومة من خلال سلسلة موفر بيانات الاعتماد الافتراضية AWS بما في ذلك متغيرات البيئة (AWS_SECRET_KEY أو AWS_SECRET_ACCESS_KEY ) وملف ملفات تعريف بيانات الاعتماد. |
roleArn |
STRING |
اسم مورد Amazon للدور الذي يجب أن تفترضه عند الوصول إلى Kinesis. |
roleExternalId |
STRING |
يستخدم عند تفويض الوصول إلى حساب AWS. |
roleSessionName |
STRING |
اسم جلسة دور AWS. |
stsEndpoint |
STRING |
نقطة نهاية لطلب بيانات اعتماد الوصول المؤقتة. |
region |
STRING |
منطقة للتدفقات التي سيتم تحديدها. الإعداد الافتراضي هو المنطقة التي تم حلها محليا. |
endpoint |
STRING |
نقطة النهاية الإقليمية لتدفقات بيانات Kinesis. الإعداد الافتراضي هو المنطقة التي تم حلها محليا. |
initialPosition |
STRING |
موضع البدء للقراءة من في الدفق. واحد من: "الأحدث" (افتراضي)، "trim_horizon"، "الأقدم"، "at_timestamp". |
consumerMode |
STRING |
واحد من: "الاستقصاء" (افتراضي)، أو "EFO" (محسن-fan-out). |
consumerName |
STRING |
اسم المستهلك. جميع المستهلكين مسبوقة ب "databricks_". الافتراضي هو سلسلة فارغة. |
registerConsumerTimeoutInterval |
STRING |
الحد الأقصى لمهلة الانتظار حتى يتم تسجيل مستهلك Kinesis EFO مع دفق Kinesis قبل طرح خطأ. الإعداد الافتراضي هو "300s". |
requireConsumerDeregistration |
BOOLEAN |
true لإلغاء تسجيل مستهلك EFO عند إنهاء الاستعلام. القيمة الافتراضية هي false . |
deregisterConsumerTimeoutInterval |
STRING |
الحد الأقصى لمهلة الانتظار حتى يتم إلغاء تسجيل مستهلك Kinesis EFO مع دفق Kinesis قبل طرح خطأ. الإعداد الافتراضي هو "300s". |
consumerRefreshInterval |
STRING |
الفاصل الزمني الذي يتم فيه فحص المستهلك وتحديثه. الإعداد الافتراضي هو "300s". |
يتم استخدام الوسيطات التالية للتحكم في معدل نقل القراءة وزمن الانتقال ل Kinesis:
المعلمة | النوع | الوصف |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
اختياري، مع افتراضي 10000 سجل للقراءة لكل طلب واجهة برمجة تطبيقات إلى Kinesis. |
maxFetchRate |
STRING |
مدى سرعة الإحضار المسبق للبيانات لكل جزء. قيمة بين "1.0" و"2.0" يتم قياسها بالميغابايت/ثانية. الإعداد الافتراضي هو "1.0". |
minFetchPeriod |
STRING |
الحد الأقصى لوقت الانتظار بين محاولات الجلب المسبق المتتالية. الإعداد الافتراضي هو "400 مللي ثانية". |
maxFetchDuration |
STRING |
الحد الأقصى لمدة تخزين البيانات الجديدة المحفوفة مسبقا مؤقتا. الإعداد الافتراضي هو "10s". |
fetchBufferSize |
STRING |
كمية البيانات للمشغل التالي. الإعداد الافتراضي هو '20 غيغابايت'. |
shardsPerTask |
INTEGER (>0) |
عدد أجزاء Kinesis التي يتم الإحضار المسبق منها بالتوازي لكل مهمة spark. القيمة الافتراضية هي 5. |
shardFetchinterval |
STRING |
عدد المرات التي يتم فيها الاستقصاء عن إعادة التقسيم. الإعداد الافتراضي هو '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
الحد الذي يحدث عنده الاندماج التلقائي. الافتراضي هو 10,000,000. |
coalesce |
BOOLEAN |
true لدمج الطلبات مسبقة الجلب. الافتراضي هو true . |
coalesceBinSize |
INTEGER (>0) |
حجم الكتلة التقريبي بعد الاندماج. الافتراضي هو 128,000,000. |
reuseKinesisClient |
BOOLEAN |
true لإعادة استخدام عميل Kinesis المخزن في ذاكرة التخزين المؤقت. الإعداد الافتراضي هو true باستثناء مجموعة PE. |
clientRetries |
INTEGER (>0) |
عدد مرات إعادة المحاولة في سيناريو إعادة المحاولة. القيمة الافتراضية هي 5. |
المرتجعات
جدول سجلات Kinesis مع المخطط التالي:
الاسم | نوع البيانات | بدون قيمة | قياسي | الوصف |
---|---|---|---|---|
partitionKey |
STRING |
لا | مفتاح يستخدم لتوزيع البيانات بين أجزاء الدفق. ستتم قراءة جميع سجلات البيانات بنفس مفتاح القسم من نفس الجزء. | |
data |
BINARY |
لا | حمولة بيانات kinesis، مرمزة ب base-64. | |
stream |
STRING |
لا | اسم الدفق الذي تمت قراءة البيانات منه. | |
shardId |
STRING |
لا | معرف فريد للجزء الذي تمت قراءة البيانات منه. | |
sequenceNumber |
BIGINT |
لا | المعرف الفريد للسجل داخل الجزء الخاص به. | |
approximateArrivalTimestamp |
TIMESTAMP |
لا | الوقت التقريبي الذي تم فيه إدراج السجل في الدفق. |
تشكل الأعمدة (stream, shardId, sequenceNumber)
مفتاحا أساسيا.
الأمثلة
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');