إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على:
Databricks SQL
إنشاء جدول دفق، جدول دلتا مع دعم إضافي للتدفق أو معالجة البيانات المتزايدة.
يتم دعم جداول الدفق فقط في جداول Delta Live وعلى Databricks SQL باستخدام كتالوج Unity. يؤدي تشغيل هذا الأمر على حساب Databricks Runtime المدعوم إلى تحليل بناء الجملة فقط. راجع تطوير التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية باستخدام SQL.
بناء الجملة
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
المعلمات
تحديث
إذا تم تحديده، فقم بتحديث الجدول بأحدث البيانات المتوفرة من المصادر المعرفة في الاستعلام. تتم معالجة البيانات الجديدة التي تصل قبل بدء الاستعلام فقط. يتم تجاهل البيانات الجديدة التي تتم إضافتها إلى المصادر أثناء تنفيذ الأمر حتى التحديث التالي. عملية التحديث من CREATE OR REFRESH تعريفية بالكامل. إذا لم يحدد أمر التحديث كافة بيانات التعريف من عبارة إنشاء الجدول الأصلية، يتم حذف بيانات التعريف غير المحددة.
إذا لم يكن موجودا
إنشاء جدول الدفق إذا لم يكن موجودا. إذا كان هناك جدول بهذا الاسم موجود بالفعل، يتم تجاهل العبارة
CREATE STREAMING TABLE.يمكنك تحديد واحد على الأكثر من
IF NOT EXISTSأوOR REFRESH.-
اسم الجدول الذي سيتم إنشاؤه. يجب ألا يتضمن الاسم مواصفات زمنية أو مواصفات خيارات. إذا لم يكن الاسم مؤهلا، يتم إنشاء الجدول في المخطط الحالي.
table_specification
تحدد هذه العبارة الاختيارية قائمة الأعمدة وأنواعها وخصائصها ووصفها وقيود الأعمدة.
إذا لم تقم بتعريف الأعمدة في مخطط الجدول، فيجب عليك تحديد
AS query.-
اسم فريد للعمود.
-
تحديد نوع بيانات العمود.
ليس فارغا
إذا تم تحديد العمود لا يقبل
NULLالقيم.column_comment التعليق
سلسلة حرفية لوصف العمود.
-
إضافة مفتاح أساسي أو قيد مفتاح خارجي إلى العمود في جدول دفق. القيود غير معتمدة للجداول في الكتالوج
hive_metastore. -
إضافة دالة قناع عمود إلى إخفاء البيانات الحساسة. تتلقى جميع الاستعلامات اللاحقة من هذا العمود نتيجة تقييم تلك الدالة فوق العمود بدلا من القيمة الأصلية للعمود. يمكن أن يكون هذا مفيدا لأغراض التحكم في الوصول الدقيقة حيث يمكن للدالة فحص الهوية أو عضويات المجموعة للمستخدم الذي يستدعي لتحديد ما إذا كان سيتم تنقيح القيمة.
القيد expectation_name المتوقع (expectation_expr) [ عند الانتهاك { فشل التحديث | DROP ROW } ]
يضيف توقعات جودة البيانات إلى الجدول. يمكن تعقب توقعات جودة البيانات هذه بمرور الوقت والوصول إليها من خلال سجل أحداث جدول الدفق.
FAIL UPDATEيؤدي التوقع إلى فشل المعالجة عند إنشاء الجدول بالإضافة إلى تحديث الجدول.DROP ROWيؤدي التوقع إلى إسقاط الصف بأكمله إذا لم يتم تلبية التوقعات.expectation_exprقد تتكون من القيم الحرفية ومعرفات الأعمدة داخل الجدول ودالات SQL أو عوامل التشغيل الحتمية والمضمنة باستثناء:- دالات التجميع
- دوال النافذة التحليلية
- دالات نافذة تحديد المرتبة
- دالات مولد قيم الجدول
كما
exprيجب ألا يحتوي على أي استعلام فرعي.- دالات التجميع
-
إضافة مفتاح أساسي إعلامي أو قيود مفتاح خارجي إعلامية إلى جدول دفق. القيود الرئيسية غير معتمدة للجداول في الكتالوج
hive_metastore.
-
-
table_clauses
حدد التقسيم والتعليقات والخصائص المعرفة من قبل المستخدم وجدول تحديث للجدول الجديد اختياريا. يمكن تحديد كل عبارة فرعية مرة واحدة فقط.
-
قائمة اختيارية بأعمدة الجدول لتقسيم الجدول حسبها.
table_comment التعليق
STRINGقيمة حرفية لوصف الجدول.-
تعيين خاصية واحدة أو أكثر من الخصائص المعرفة من قبل المستخدم اختياريا.
استخدم هذا الإعداد لتحديد قناة وقت تشغيل Delta Live Tables المستخدمة لتشغيل هذه العبارة. تعيين قيمة الخاصية
pipelines.channelإلى"PREVIEW"أو"CURRENT". القيمة الافتراضية هي"CURRENT". لمزيد من المعلومات حول قنوات Delta Live Tables، راجع قنوات وقت تشغيل Delta Live Tables. جدولة [ تحديث ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }لجدولة تحديث يحدث بشكل دوري، استخدم
EVERYبناء الجملة. إذاEVERYتم تحديد بناء الجملة، يتم تحديث جدول الدفق أو طريقة العرض المجسدة بشكل دوري في الفاصل الزمني المحدد استنادا إلى القيمة المتوفرة، مثلHOURأو .WEEKSHOURSDAYDAYSWEEKيسرد الجدول التالي قيم العدد الصحيح المقبولة لnumber.وحدة الوقت قيمة عدد صحيح HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 إشعار
الأشكال المفردة والجمعية للوحدة الزمنية المضمنة مكافئة دلاليا.
CRON cron_string [ AT TIME ZONE timezone_id ]لجدولة تحديث باستخدام قيمة كوارتز كرون . يتم قبول time_zone_values صالحة.
AT TIME ZONE LOCALغير مدعم.إذا
AT TIME ZONEلم يكن موجودا، يتم استخدام المنطقة الزمنية للجلسة. إذاAT TIME ZONEلم يكن موجودا ولم يتم تعيين المنطقة الزمنية لجلسة العمل، يتم طرح خطأ.SCHEDULEمكافئ دلاليا لSCHEDULE REFRESH.
يمكن توفير الجدول كجزء من
CREATEالأمر. استخدم ALTER STREAMING TABLE أو قم بتشغيلCREATE OR REFRESHالأمر معSCHEDULEعبارة لتغيير جدول الدفق بعد الإنشاء.عبارة WITH ROW FILTER
إضافة دالة عامل تصفية صف إلى الجدول. تتلقى جميع الاستعلامات اللاحقة من هذا الجدول مجموعة فرعية من الصفوف حيث يتم تقييم الدالة إلى TRUE منطقي. يمكن أن يكون هذا مفيدا لأغراض التحكم في الوصول الدقيقة حيث يمكن للدالة فحص الهوية أو عضويات المجموعة للمستخدم الذي يستدعي لتحديد ما إذا كان سيتم تصفية صفوف معينة.
-
-
تملأ هذه العبارة الجدول باستخدام البيانات من
query. يجب أن يكون هذا الاستعلام استعلام تدفق . يمكن تحقيق ذلك عن طريق إضافةSTREAMالكلمة الأساسية إلى أي علاقة تريد معالجتها بشكل متزايد. عند تحديدqueryوtable_specificationمعا، يجب أن يحتوي مخطط الجدول المحدد فيtable_specificationعلى كافة الأعمدة التي تم إرجاعها بواسطةquery، وإلا تحصل على خطأ. أي أعمدة محددة فيtable_specificationولكن لا يتم إرجاعها بواسطةqueryقيم الإرجاعnullعند الاستعلام.
الاختلافات بين جداول الدفق والجداول الأخرى
جداول الدفق هي جداول ذات حالة، مصممة للتعامل مع كل صف مرة واحدة فقط أثناء معالجة مجموعة بيانات متزايدة. نظرا لأن معظم مجموعات البيانات تنمو باستمرار بمرور الوقت، فإن جداول الدفق جيدة لمعظم أحمال عمل الاستيعاب. تعد جداول الدفق مثالية للبنية الأساسية لبرنامج ربط العمليات التجارية التي تتطلب حداثة البيانات وزمن انتقال منخفض. يمكن أن تكون جداول الدفق مفيدة أيضا للتحويلات الضخمة على نطاق واسع، حيث يمكن حساب النتائج بشكل متزايد مع وصول بيانات جديدة، مما يحافظ على تحديث النتائج دون الحاجة إلى إعادة حساب جميع بيانات المصدر بالكامل مع كل تحديث. تم تصميم جداول الدفق لمصادر البيانات الملحقة فقط.
تقبل جداول الدفق أوامر إضافية مثل REFRESH، والتي تعالج أحدث البيانات المتوفرة في المصادر المتوفرة في الاستعلام. تنعكس التغييرات على الاستعلام المقدم فقط على البيانات الجديدة عن طريق استدعاء REFRESH، وليس البيانات التي تمت معالجتها مسبقا. لتطبيق التغييرات على البيانات الموجودة أيضا، تحتاج إلى تنفيذ REFRESH TABLE <table_name> FULL لتنفيذ FULL REFRESH. يقوم التحديث الكامل بإعادة معالجة جميع البيانات المتوفرة في المصدر بأحدث تعريف. لا ينصح باستدعاء التحديثات الكاملة على المصادر التي لا تحتفظ بمحفوظات البيانات بأكملها أو لديها فترات استبقاء قصيرة، مثل Kafka، حيث يقوم التحديث الكامل باقتطاع البيانات الموجودة. قد لا تتمكن من استرداد البيانات القديمة إذا لم تعد البيانات متوفرة في المصدر.
عوامل تصفية الصفوف وأقنعة الأعمدة
تتيح لك عوامل تصفية الصفوف تحديد دالة تنطبق كعامل تصفية كلما جلب فحص الجدول صفوفا. تضمن عوامل التصفية هذه أن الاستعلامات اللاحقة ترجع فقط الصفوف التي تقيم لها دالة تقييم عامل التصفية إلى true.
تتيح لك أقنعة الأعمدة إخفاء قيم العمود كلما جلب فحص الجدول صفوفا. ستتلقى جميع الاستعلامات المستقبلية التي تتضمن هذا العمود نتيجة تقييم الدالة عبر العمود، واستبدال القيمة الأصلية للعمود.
لمزيد من المعلومات حول كيفية استخدام عوامل تصفية الصفوف وأقنعة الأعمدة، راجع تصفية بيانات الجدول الحساسة باستخدام عوامل تصفية الصفوف وأقنعة الأعمدة.
إدارة عوامل تصفية الصفوف وأقنعة الأعمدة
يجب إضافة عوامل تصفية الصفوف وأقنعة الأعمدة على جداول الدفق أو تحديثها أو إسقاطها من خلال العبارة CREATE OR REFRESH .
سلوك
- تحديث كمحدد: عندما
CREATE OR REFRESHتقوم العبارات أوREFRESHبتحديث جدول دفق، يتم تشغيل دالات عامل تصفية الصف مع حقوق المحدد (كمالك الجدول). وهذا يعني أن تحديث الجدول يستخدم سياق الأمان للمستخدم الذي أنشأ جدول الدفق. - الاستعلام: أثناء تشغيل معظم عوامل التصفية مع حقوق المحدد، فإن الوظائف التي تتحقق من سياق المستخدم (مثل
CURRENT_USERوIS_MEMBER) هي استثناءات. تعمل هذه الدالات كمستدعي. يفرض هذا الأسلوب أمان البيانات الخاصة بالمستخدم وعناصر التحكم في الوصول استنادا إلى سياق المستخدم الحالي.
الملاحظة
استخدم DESCRIBE EXTENDEDأو INFORMATION_SCHEMAأو مستكشف الكتالوج لفحص عوامل تصفية الصفوف وأقنعة الأعمدة الموجودة التي تنطبق على جدول تدفق معين. تسمح هذه الوظيفة للمستخدمين بمراجعة ومراجعة الوصول إلى البيانات ومقاييس الحماية على جداول الدفق.
القيود
يمكن لمالكي الجداول فقط تحديث جداول الدفق للحصول على أحدث البيانات.
ALTER TABLEالأوامر غير مسموح بها على جداول الدفق. يجب تغيير تعريف الجدول وخصائصه من خلالCREATE OR REFRESHعبارة ALTER STREAMING TABLE أو .استعلامات السفر عبر الزمن غير معتمدة.
تطوير مخطط الجدول من خلال أوامر DML مثل
INSERT INTO، وغيرMERGEمدعوم.الأوامر التالية غير معتمدة في جداول الدفق:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
مشاركة دلتا غير معتمدة.
إعادة تسمية الجدول أو تغيير المالك غير معتمد.
قيود الجدول مثل
PRIMARY KEYوFOREIGN KEYغير معتمدة.الأعمدة التي تم إنشاؤها وأعمدة الهوية والأعمدة الافتراضية غير معتمدة.
الأمثلة
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')