Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şunlar için geçerlidir:
Databricks SQL
Bir akış tablosu, akış veya artımlı veri işleme için ek destek içeren bir Delta tablosu oluşturur.
Akış tabloları yalnızca Lakeflow Spark Bildirimli İşlem Hatlarında ve Unity Kataloğu ile Databricks SQL'de desteklenir. Desteklenen Databricks Runtime işlemlerinde bu komutu çalıştırmak yalnızca söz dizimini ayrıştırmaktadır. Bkz SQL ile Lakeflow Spark Deklaratif İşlem Hatları kodu geliştirme.
Sözdizimi
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
Parametreler
REFRESH
Belirtilirse, tabloyu sorguda tanımlanan kaynaklardan sağlanan en son verilerle yeniler. Yalnızca sorgu başlamadan önce gelen yeni veriler işlenir. Komutun yürütülmesi sırasında kaynaklara eklenen yeni veriler bir sonraki yenilemeye kadar yoksayılır. CREATE OR REFRESH yenileme işlemi tamamen bildirim temellidir. Yenileme komutu özgün tablo oluşturma deyimindeki tüm meta verileri belirtmezse belirtilmeyen meta veriler silinir.
EĞER YOKSA
Akış tablosunu mevcut değilse oluşturur. Bu isimde bir tablo zaten varsa,
CREATE STREAMING TABLEdeyimi görmezden gelinir.IF NOT EXISTSveyaOR REFRESH'in en fazla birini belirtebilirsiniz.-
Oluşturulacak tablonun adı. Ad bir zamansal belirtim veya seçenek belirtimi içermemelidir. Ad uygun değilse, tablo geçerli şemada oluşturulur.
tablo_spesifikasyonu
Bu isteğe bağlı yan tümcesi sütunların listesini, türlerini, özelliklerini, açıklamalarını ve sütun kısıtlamalarını tanımlar.
Tablo şemasında sütun tanımlamazsanız
AS querybelirtmeniz gerekir.-
Sütun adı için benzersiz bir isim.
-
Sütunun veri türünü belirtir.
NOT NULL
Belirtilirse sütun
NULLdeğerleri kabul etmez.YORUM column_comment
Sütunu tanımlayan bir dize sabiti.
-
Önemli
Bu özellik Genel Önizlemededir.
Akış tablosundaki sütuna birincil anahtar veya yabancı anahtar kısıtlaması ekler. kısıtlamalar
hive_metastorekataloğundaki tablolar için desteklenmez. -
Hassas verileri anonim hale getirmek için bir sütun maskesi işlevi ekler. Bu sütundaki sonraki tüm sorgular, sütunun özgün değeri yerine bu işlevi sütun üzerinde değerlendirmenin sonucunu alır. İnce detaylı erişim kontrolüne yönelik amaçlar için, işlevin çağıran kullanıcının kimliğini veya grup üyeliklerini inceleyerek değeri gizleyip gizlememeye karar vermesi bakımından faydalı olabilir.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ İHLAL DURUMUNDA { FAIL UPDATE | SATIRI SİL } ]
Tabloya veri kalitesi beklentileri ekler. Bu veri kalitesi beklentileri zaman içinde izlenebilir ve akış tablosunun olay günlüğü aracılığıyla erişilebilir.
FAIL UPDATEbir beklenti, hem tabloyu oluştururken hem de tabloyu yenilerken işlemenin başarısız olmasına neden olur. Beklenti karşılanmadığında,DROP ROWsatırın tamamının silinmesine neden olur.expectation_exprdeğişmez değerlerden, tablo içindeki sütun tanımlayıcılarından ve aşağıdakiler dışında belirleyici, yerleşik SQL işlevlerinden veya işleçlerinden oluşabilir:-
Toplama işlevleri
- Analitik pencere işlevleri
- Derecelendirme penceresi işlevleri
- Tablo değer döndüren oluşturucu fonksiyonlar
Ayrıca
exprherhangi bir alt sorgu içermemelidir.-
Toplama işlevleri
-
Önemli
Bu özellik Genel Önizlemededir.
Bir akış tablosuna bilgi amacı taşıyan birincil anahtar veya bilgi amacı taşıyan yabancı anahtar kısıtlamaları ekler.
hive_metastorekataloğundaki tablolar için anahtar kısıtlamaları desteklenmez.
-
-
tablo koşulları
İsteğe bağlı olarak bölümleme, açıklamalar, kullanıcı tanımlı özellikler ve yeni tablo için yenileme zamanlaması belirtin. Her alt yan tümce yalnızca bir kez belirtilebilir.
-
Tabloyu bölümleme ölçütü olarak tablo sütunlarının isteğe bağlı listesi.
Not
Sıvı kümeleme, kümeleme için esnek, iyileştirilmiş bir çözüm sağlar. Akış tabloları için
CLUSTER BYyerinePARTITIONED BYkullanmayı göz önünde bulundurun. -
Sütunların bir alt kümesine göre kümeleme yapması için isteğe bağlı bir ifade. ile
CLUSTER BY AUTOotomatik sıvı kümelemeyi kullanın ve Databricks sorgu performansını iyileştirmek için kümeleme anahtarlarını akıllı bir şekilde seçer. Bkz Tablolar için sıvı kümeleme kullanma.Sıvı kümeleme ile
PARTITIONED BYbirleştirilemez. YORUM table_comment
Tabloyu açıklamak için
STRINGsabiti.VARSAYıLAN HARMANLAMA UTF8_BINARY
Şunun için geçerlidir:
Databricks SQL
Databricks Runtime 17.1 ve üzeriAkış tablosunun varsayılan harmanlamasını
UTF8_BINARYolarak zorlar. Bu madde, tablonun oluşturulduğu şemadaUTF8_BINARYdışındaki bir varsayılan karşılaştırma düzeni varsa zorunludur. Akış tablosunun varsayılan harmanlaması,queryiçinde ve sütun türleri için varsayılan harmanlama olarak kullanılır.-
İsteğe bağlı olarak bir veya daha fazla kullanıcı tanımlı özellik ayarlar.
Bu deyimi çalıştırmak için kullanılan Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanı kanalını belirtmek için bu ayarı kullanın.
pipelines.channelözelliğinin değerini"PREVIEW"veya"CURRENT"olarak ayarlayın. Varsayılan değer şudur:"CURRENT". Lakeflow Spark Bildirimli İşlem Hatları kanalları hakkında daha fazla bilgi için bkz. Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanı kanalları. program
Zamanlama bir
SCHEDULEdeyim veya deyimTRIGGERolabilir.TAKVİM [ REFRESH ] program_maddesi
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }Düzenli aralıklarla gerçekleşen bir yenileme zamanlamak için
EVERYsöz dizimini kullanın.EVERYsöz dizimi belirtilirse, akış tablosu veya gerçekleştirilmiş görünüm, sağlanan değere göre düzenli aralıklarla yenilenir; örneğin,HOUR,HOURS,DAY,DAYS,WEEKveyaWEEKS. Aşağıdaki tabloda,numberiçin kabul edilen tamsayı değerleri listelenmiştir.Zaman birimi Tamsayı değeri HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 Not
Dahil edilen zaman biriminin tekil ve çoğul biçimleri sematik olarak eşdeğerdir.
CRON cron_string [ AT TIME ZONE timezone_id ]quartz cron değeri kullanarak yenileme işlemi zamanlamak için. Geçerli time_zone_values kabul edilir.
AT TIME ZONE LOCALdesteklenmez.AT TIME ZONEmevcut değilse, oturum saat dilimi kullanılır.AT TIME ZONEyoksa ve oturum saat dilimi ayarlanmadıysa bir hata oluşur.SCHEDULE, ile eşanlamlı olarak eşdeğerdirSCHEDULE REFRESH.
Zamanlama komutun
CREATEbir parçası olarak sağlanabilir. oluşturma işleminden sonra akış tablosunun zamanlamasını değiştirmek için ALTER STREAMING TABLE kullanın veyaCREATE OR REFRESHyan tümcesiyleSCHEDULEkomutunu çalıştırın.TETIKLEYICI AÇıK UPDATE [ EN ÇOK HER trigger_interval ]
Önemli
Bu
TRIGGER ON UPDATEözellik Beta sürümündedir.İsteğe bağlı olarak, bir yukarı akış veri kaynağı güncelleştirildiğinde tabloyu yenilemek için en fazla dakikada bir ayarlayın. yenilemeler arasında en az bir süre gerektirecek şekilde değerini
AT MOST EVERYayarlayın.Yukarı akış veri kaynakları dış veya yönetilen Delta tabloları (gerçekleştirilmiş görünümler veya akış tabloları dahil) veya bağımlılıkları desteklenen tablo türleriyle sınırlı olan yönetilen görünümler olmalıdır.
Dosya olaylarının etkinleştirilmesi tetikleyicileri daha performanslı hale getirir ve tetikleyici güncelleştirmelerindeki sınırların bazılarını artırır.
trigger_interval, en az 1 dakikalık bir INTERVAL deyimidir.TRIGGER ON UPDATEaşağıdaki sınırlamalara sahiptir- TRIGGER ON UPDATEkullanılırken akış tablosu başına 10'dan fazla yukarı akış veri kaynağı yoktur.
- TRIGGER ON UPDATEile en fazla 1000 akış tablosu veya gerçekleştirilmiş görünüm belirtilebilir.
- Yan
AT MOST EVERYtümcesi varsayılan olarak 1 dakikadır ve 1 dakikadan kısa olamaz.
-
-
Tabloya bir satır filtresi işlevi ekler. Bu tablodan sonraki tüm sorgular, işlevin TRUE değerini değerlendirdiği satırların bir alt kümesini alır. Bu, işlevin belirli satırları filtreleyip filtrelememeye karar vermek için çağıran kullanıcının kimlik veya grup üyeliklerini incelediği ayrıntılı erişim denetimi amaçları için yararlı olabilir.
AS sorgusu
Bu yan tümce,
queryverilerini kullanarak tabloyu doldurur. Bu sorgu bir akış sorgusu olmalıdır. Bu, artımlı olarak işlemek istediğiniz herhangi bir ilişkiye anahtar sözcüğünü ekleyerekSTREAMelde edilebilir. Birlikte birqueryve birtable_specificationbelirttiğinizde,table_specificationiçinde belirtilen tablo şemasıquerytarafından döndürülen tüm sütunları içermelidir, aksi takdirde bir hata alırsınız.table_specificationbelirtilen ancakquerytarafından döndürülmeyen tüm sütunlar sorgulandığındanulldeğerleri döndürür.
Akış tabloları ile diğer tablolar arasındaki farklar
Akış tabloları, büyüyen bir veri kümesini işlerken her satırı yalnızca bir kez işlemek üzere tasarlanmış durum bilgisi olan tablolardır. Çoğu veri kümesi zaman içinde sürekli büyüdüğü için akış tabloları çoğu alım iş yükü için iyidir. Akış tabloları, veri güncelliği ve düşük gecikme süresi gerektiren işlem hatları için idealdir. Akış tabloları, yeni veriler geldikçe artımlı olarak hesaplanabilir ve her güncelleştirmede tüm kaynak verileri tam olarak yeniden derlemeye gerek kalmadan sonuçları güncel tutarak büyük ölçekli dönüşümler için de yararlı olabilir. Akış tabloları yalnızca ekli veri kaynakları için tasarlanmıştır.
Akış tabloları, sorguda sağlanan kaynaklarda bulunan en son verileri işleyen REFRESHgibi ek komutları kabul eder. Sağlanan sorguda yapılan değişiklikler, daha önce işlenen veriler üzerinde değil, yalnızca REFRESH çağrısı yapılarak yeni veriler üzerinde yansıtılır. Değişiklikleri mevcut verilere de uygulamak için REFRESH TABLE <table_name> FULL komutunu çalıştırarak bir FULL REFRESH gerçekleştirmeniz gerekir. Tam yenilemeler, kaynakta bulunan tüm verileri en son tanım ile yeniden işler. Verilerin geçmişinin tamamını tutmayan veya Kafka gibi kısa saklama süreleri olan kaynaklarda tam yenilemelerin çağrılması önerilmez çünkü tam yenileme mevcut verileri kısaltmaktadır. Veriler artık kaynakta kullanılamıyorsa eski verileri kurtaramayabilirsiniz.
Satır filtreleri ve sütun maskeleri
Satır filtreleri, tablo taraması satırları her getirdiğinde filtre olarak uygulanan bir işlev belirtmenize olanak tanır. Bu filtreler, izleyen sorguların yalnızca filtre koşulunun true olarak değerlendirildiği satırları döndürmesini sağlar.
Sütun maskeleri, tablo taraması satırları her getirdiğinizde sütunun değerlerini maskelemenizi sağlar. Bu sütunu içeren gelecekteki tüm sorgular, işlevi sütun üzerinde değerlendirerek sütunun özgün değerini değiştirmenin sonucunu alır.
Satır filtrelerini ve sütun maskelerini kullanma hakkında daha fazla bilgi için bkz. Satır filtreleri ve sütun maskeleri.
Satır Filtrelerini ve Sütun Maskelerini Yönetme
Akış tablolarındaki satır filtreleri ve sütun maskeleri, CREATE OR REFRESH deyimi aracılığıyla eklenmeli, güncellenmeli veya bırakılmalıdır.
Davranış
-
Definer olarak yenile:
CREATE OR REFRESHveyaREFRESHdeyimleri bir akış tablosunu yenilediğinde, satır filtresi işlevleri tanımlayıcının (tablo sahibi olarak) haklarıyla çalışır. Bu, tablo yenilemesinin akış tablosunu oluşturan kullanıcının güvenlik bağlamını kullandığı anlamına gelir. -
Sorgu: Çoğu filtre, tanımlayıcının haklarıyla çalıştırılırken, kullanıcı bağlamını (ve
CURRENT_USERgibiIS_MEMBER) denetleen işlevler özel durumlardır. Bu işlevler çağırıcı olarak çalışır. Bu yaklaşım, geçerli kullanıcının bağlamını temel alarak kullanıcıya özgü veri güvenliği ve erişim denetimlerini zorunlu kılıp uygular.
Gözlemlenebilirlik
Belirli bir akış tablosuna uygulanan mevcut satır filtrelerini ve sütun maskelerini incelemek için DESCRIBE EXTENDED, INFORMATION_SCHEMAveya Katalog Gezgini'ni kullanın. Bu işlevsellik, kullanıcıların akış tablolarındaki veri erişimi ve koruma ölçülerini denetlemesine ve gözden geçirmesine olanak tanır.
Sınırlamalar
- En son verileri almak için yalnızca tablo sahipleri akış tablolarını yenileyebilir.
- akış tablolarında
ALTER TABLEkomutlara izin verilmez. Tablonun tanımı ve özellikleriCREATE OR REFRESHveya ALTER STREAMING TABLE deyimiyle değiştirilmelidir. - tablo şemasını
INSERT INTOveMERGEgibi DML komutları aracılığıyla geliştirme desteklenmez. - Akış tablolarında aşağıdaki komutlar desteklenmez:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- Delta Paylaşımı desteklenmez.
- Tabloyu yeniden adlandırmak veya sahibini değiştirmek desteklenmemektedir.
-
PRIMARY KEYveFOREIGN KEYgibi tablo kısıtlamaları,hive_metastorekatalogundaki akış tablolarında desteklenmez. - Oluşturulan sütunlar, kimlik sütunları ve varsayılan sütunlar desteklenmez.
Örnekler
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')