Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dotyczy:
Databricks SQL
Tworzy tabelę przesyłania strumieniowego , tabelę delty z dodatkową obsługą przesyłania strumieniowego lub przetwarzania danych przyrostowych.
Tabele przesyłania strumieniowego są obsługiwane tylko w deklaratywnych potokach Lakeflow Spark i w Databricks SQL z Unity Catalog. Uruchomienie tego polecenia w obsługiwanym środowisku Databricks Runtime oblicza tylko składnię. Zobacz Tworzenie deklaratywnego kodu potoków w Lakeflow Spark za pomocą SQL.
Składnia
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
Parametry
REFRESH
Jeśli zostanie określona, odświeża tabelę z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu. Tylko nowe dane, które docierają przed rozpoczęciem zapytania, są przetwarzane. Nowe dane dodawane do źródeł podczas wykonywania polecenia są ignorowane do następnego odświeżania. Operacja odświeżania z polecenia CREATE OR REFRESH jest w pełni deklaratywna. Jeśli polecenie odświeżania nie określa wszystkich metadanych z oryginalnej instrukcji tworzenia tabeli, nieokreślone metadane zostaną usunięte.
JEŚLI NIE ISTNIEJE
Tworzy tabelę przesyłania strumieniowego, jeśli nie istnieje. Jeśli tabela o tej nazwie już istnieje, instrukcja
CREATE STREAMING TABLEjest ignorowana.Możesz określić co najwyżej jeden z
IF NOT EXISTSlubOR REFRESH.-
Nazwa tabeli do utworzenia. Nazwa nie może zawierać specyfikacji czasowej ani specyfikacji opcji. Jeśli nazwa nie jest kwalifikowana, tabela zostanie utworzona w bieżącym schemacie.
specyfikacja_tabeli
Ta klauzula opcjonalna definiuje listę kolumn, ich typów, właściwości, opisów i ograniczeń kolumn.
Jeśli nie zdefiniujesz kolumn w schemacie tabeli, musisz określić
AS query.-
Unikatowa nazwa kolumny.
-
Określa typ danych kolumny.
NOT NULL
Jeśli jest to określone, kolumna nie akceptuje wartości
NULL.KOMENTARZ KOLUMNY
Literał tekstowy opisujący kolumnę.
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje ograniczenie klucza podstawowego lub klucza obcego do kolumny w tabeli przesyłania strumieniowego. Ograniczenia nie są obsługiwane w przypadku tabel w wykazie
hive_metastore. -
Dodaje funkcję maski kolumn do anonimowości poufnych danych. Wszystkie kolejne zapytania z tej kolumny otrzymują wynik oceny tej funkcji w kolumnie zamiast oryginalnej wartości kolumny. Może to być przydatne w celach szczegółowej kontroli dostępu, w których funkcja może sprawdzić tożsamość lub członkostwo w grupach użytkownika wywołującego, aby zdecydować, czy zredagować wartość.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ W PRZYPADKU NARUSZENIA { FAIL UPDATE | DROP ROW } ]
Dodaje wymagania dotyczące jakości danych do tabeli. Te oczekiwania dotyczące jakości danych można śledzić w czasie i uzyskiwać do nich dostęp za pośrednictwem dziennika zdarzeń tabeli przesyłania strumieniowego. Oczekiwanie
FAIL UPDATEpowoduje niepowodzenie przetwarzania podczas tworzenia tabeli, a także odświeżania tabeli. OczekiwanieDROP ROWpowoduje usunięcie całego wiersza, jeżeli oczekiwanie nie zostanie spełnione.expectation_exprmogą składać się z literałów, identyfikatorów kolumn w tabeli oraz deterministycznych, wbudowanych funkcji LUB operatorów SQL z wyjątkiem:-
Funkcje agregujące
- funkcje okien analitycznych
- Funkcje okna rankingowego
- Funkcje generatora wartości tabeli
Ponadto
exprnie może zawierać żadnego podzapytania.-
Funkcje agregujące
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje informacyjne ograniczenia klucza podstawowego lub klucza obcego do tabeli przesyłania strumieniowego. Ograniczenia klucza nie są obsługiwane w przypadku tabel w wykazie
hive_metastore.
-
-
table_clauses
Opcjonalnie określ partycjonowanie, komentarze, właściwości zdefiniowane przez użytkownika i harmonogram odświeżania nowej tabeli. Każda klauzula podrzędna może być określona tylko raz.
-
Opcjonalna lista kolumn, według których tabelę można partycjonować.
Uwaga
Liquid clustering zapewnia elastyczne, zoptymalizowane rozwiązanie do klastrowania. Rozważ użycie
CLUSTER BYzamiastPARTITIONED BYdla tabel przesyłania strumieniowego. -
Klauzula opcjonalna do klastrowania według podzestawu kolumn. Użyj automatycznego klastrowania liquid z usługą
CLUSTER BY AUTO, a usługa Databricks inteligentnie wybiera klucze klastrowania, aby zoptymalizować wydajność zapytań. Zobacz Używaj płynnego grupowania dla tabel.Klastrowanie liquid nie może być łączone z
PARTITIONED BY. Komentarz do tabeli
Literał
STRING, który opisuje tabelę.DOMYŚLNE SORTOWANIE UTF8_BINARY
Dotyczy:
, sprawdź, czy usługa SQL databricks
Databricks Runtime 17.1 lub nowszaWymusza ustawienie domyślnego porządku sortowania tabeli przesyłania strumieniowego na
UTF8_BINARY. Ta klauzula jest obowiązkowa, jeśli schemat, w którym jest tworzona tabela, ma domyślne sortowanie inne niżUTF8_BINARY. Domyślne sortowanie tabeli przesyłania strumieniowego jest używane jako domyślne sortowanie w obrębiequeryoraz dla typów kolumn.-
Opcjonalnie ustawia co najmniej jedną właściwość zdefiniowaną przez użytkownika.
To ustawienie służy do określania kanału wykonawczego Spark Lakeflow używanego do wykonania tego polecenia. Ustaw wartość właściwości
pipelines.channelna wartość"PREVIEW"lub"CURRENT". Domyślna wartość to"CURRENT". Aby uzyskać więcej informacji na temat kanałów deklaratywnych potoków platformy Spark w usłudze Lakeflow, zobacz Kanały środowiska uruchomieniowego potoków deklaratywnych platformy Lakeflow. harmonogram
Harmonogram może być instrukcją
SCHEDULElub instrukcjąTRIGGER.HARMONOGRAM [ REFRESH ] klauzula_harmonogramu
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }Aby zaplanować odświeżanie okresowo, użyj składni
EVERY. Jeśli określono składnięEVERY, tabela strumieniowa lub zmaterializowany widok jest okresowo odświeżany w określonym interwale na podstawie podanej wartości, takiej jakHOUR,HOURS,DAY,DAYS,WEEKlubWEEKS. W poniższej tabeli wymieniono zaakceptowane wartości całkowite dlanumber.Jednostka czasu Wartość całkowita HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 Uwaga
Liczba pojedyncza i mnoga dołączonej jednostki czasowej są semantycznie równoważne.
CRON cron_string [ AT TIME ZONE timezone_id ]Aby zaplanować odświeżanie przy użyciu wartości cron quartz. Akceptowane są prawidłowe time_zone_values .
AT TIME ZONE LOCALnie jest obsługiwana.Jeśli
AT TIME ZONEjest nieobecny, używana jest strefa czasowa sesji. JeśliAT TIME ZONEjest nieobecny, a strefa czasowa sesji nie jest ustawiona, zostanie zgłoszony błąd.SCHEDULEjest semantycznie równoważne .SCHEDULE REFRESH
Harmonogram można podać w ramach
CREATEpolecenia . Użyj ALTER STREAMING TABLE lub uruchom polecenieCREATE OR REFRESHz klauzuląSCHEDULE, aby zmienić harmonogram tabeli przesyłania strumieniowego po utworzeniu.WYZWALACZ NA UPDATE [ CO NAJWYŻEJ CO TRIGGER_INTERVAL ]
Ważne
Funkcja
TRIGGER ON UPDATEjest dostępna w wersji beta.Opcjonalnie ustaw tabelę tak, aby odświeżyła się po zaktualizowaniu nadrzędnego źródła danych, co najwyżej co minutę. Ustaw wartość ,
AT MOST EVERYaby wymagać co najmniej minimalnego czasu między odświeżeniami.Nadrzędne źródła danych muszą być zewnętrznymi lub zarządzanymi tabelami delty (w tym zmaterializowanymi widokami lub tabelami przesyłania strumieniowego) albo widokami zarządzanymi, których zależności są ograniczone do obsługiwanych typów tabel.
Włączenie zdarzeń plików może sprawić, że wyzwalacze będą bardziej wydajne i zwiększa niektóre limity dotyczące aktualizacji wyzwalacza.
Jest
trigger_intervalto instrukcja INTERVAL , która jest co najmniej 1 minuta.TRIGGER ON UPDATEma następujące ograniczenia- Nie więcej niż 10 nadrzędnych źródeł danych na tabelę przesyłania strumieniowego w przypadku korzystania z funkcji TRIGGER ON UPDATE.
- Maksymalnie 1000 tabel przesyłania strumieniowego lub zmaterializowanych widoków można określić za pomocą TRIGGER ON UPDATE.
- Klauzula
AT MOST EVERYjest domyślnie ustawiona na 1 minutę i nie może być mniejsza niż 1 minuta.
-
Z klauzulą ROW FILTER
Dodaje do tabeli funkcję filtru wierszy. Wszystkie kolejne zapytania z tej tabeli otrzymują podzbiór wierszy, w których funkcja zwraca wartość logiczną TRUE. Może to być przydatne w celach szczegółowej kontroli dostępu, w których funkcja może sprawdzić tożsamość lub członkostwo w grupach użytkownika wywołującego, aby zdecydować, czy filtrować niektóre wiersze.
-
Ta klauzula wypełnia tabelę przy użyciu danych z
query. To zapytanie musi być zapytaniem przesyłanym strumieniowo. Można to osiągnąć, dodającSTREAMsłowo kluczowe do dowolnej relacji, którą chcesz przetwarzać przyrostowo. Po określeniuqueryitable_specificationrazem schemat tabeli określony wtable_specificationmusi zawierać wszystkie kolumny zwrócone przezquery, w przeciwnym razie zostanie wyświetlony błąd. Wszystkie kolumny określone wtable_specification, ale nie zwracane przezqueryzwracają wartościnullpodczas wykonywania zapytania.
Różnice między tabelami przesyłania strumieniowego a innymi tabelami
Tabele strumieniowane to tabele stanowe, specjalnie zaprojektowane do przechwytywania każdego wiersza tylko raz podczas przetwarzania rosnącego zbioru danych. Ponieważ większość zestawów danych stale rośnie z czasem, tabele przesyłania strumieniowego są dobre dla większości obciążeń związanych z pozyskiwaniem danych. Tabele przesyłania strumieniowego są optymalne dla przepływów danych, które wymagają świeżości danych i niskiej latencji. Tabele przesyłania strumieniowego mogą być również przydatne w przypadku transformacji na dużą skalę, ponieważ wyniki mogą być obliczane przyrostowo w miarę nadejścia nowych danych, zapewniając aktualność wyników bez konieczności pełnej ponownej kompilacji wszystkich danych źródłowych przy każdej aktualizacji. Tabele przesyłania strumieniowego są przeznaczone dla źródeł danych, do których dane są jedynie dołączane.
Tabele przesyłania strumieniowego akceptują dodatkowe polecenia, takie jak REFRESH, które przetwarzają najnowsze dane dostępne w źródłach podanych w zapytaniu. Zmiany w podanym zapytaniu są odzwierciedlane tylko w nowych danych przez wywołanie REFRESH, nie wpływając na wcześniej przetworzone dane. Aby zastosować zmiany w istniejących danych, należy wykonać REFRESH TABLE <table_name> FULL oraz FULL REFRESH. Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Filtry wierszy i maski kolumn
Filtry wierszy umożliwiają określenie funkcji, która ma zastosowanie jako filtr za każdym razem, gdy skanowanie tabeli pobiera wiersze. Te filtry zapewniają, że kolejne zapytania zwracają tylko wiersze, dla których predykat filtru daje wartość true.
Maski kolumn umożliwiają maskowanie wartości kolumny za każdym razem, gdy skanowanie tabeli pobiera wiersze. Wszystkie przyszłe zapytania obejmujące tę kolumnę otrzymają wynik oceny funkcji w kolumnie, zastępując oryginalną wartość kolumny.
Aby uzyskać więcej informacji na temat używania filtrów wierszy i masek kolumn, zobacz Filtry wierszy i maski kolumn.
Zarządzanie filtrami wierszy i maskami kolumn
Filtry wierszy i maski kolumn w tabelach przesyłania strumieniowego powinny być dodawane, aktualizowane lub porzucane przez instrukcję CREATE OR REFRESH.
Zachowanie
-
Odśwież jako definiujący: Gdy instrukcje
CREATE OR REFRESHlubREFRESHodświeżają tabelę strumieniową, funkcje filtrowania wierszy są uruchamiane z przywilejami definiującego (jako właściciel tabeli). Oznacza to, że odświeżanie tabeli używa kontekstu zabezpieczeń użytkownika, który utworzył tabelę przesyłania strumieniowego. -
Zapytanie: Podczas gdy większość filtrów jest uruchamiana z prawami definiowanego, funkcje sprawdzające kontekst użytkownika (takie jak
CURRENT_USERiIS_MEMBER) są wyjątkami. Te funkcje działają z uprawnieniami wywołującego. Takie podejście wymusza zabezpieczenia danych specyficzne dla użytkownika i mechanizmy kontroli dostępu na podstawie kontekstu bieżącego użytkownika.
Obserwowalność
Użyj DESCRIBE EXTENDED, INFORMATION_SCHEMAlub Eksploratora wykazu, aby zbadać istniejące filtry wierszy i maski kolumn, które mają zastosowanie do danej tabeli przesyłania strumieniowego. Ta funkcja umożliwia użytkownikom przeprowadzanie inspekcji i przeglądania środków dostępu do danych i ochrony w tabelach przesyłania strumieniowego.
Ograniczenia
- Tylko właściciele tabel mogą odświeżać tabele przesyłania strumieniowego, aby uzyskać najnowsze dane.
-
ALTER TABLEpolecenia są niedozwolone w tabelach przesyłania strumieniowego. Definicja i właściwości tabeli powinny zostać zmienione za pomocą instrukcjiCREATE OR REFRESHlub ALTER STREAMING TABLE. - Ewolucja schematu tabeli za pomocą poleceń DML, takich jak
INSERT INTO, iMERGEnie jest obsługiwana. - Następujące komendy nie są obsługiwane w tabelach przesyłania strumieniowego:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- Delta Sharing nie jest obsługiwane.
- Zmiana nazwy tabeli lub zmiana właściciela nie jest obsługiwana.
- Ograniczenia, takie jak
PRIMARY KEYiFOREIGN KEY, nie są obsługiwane w przypadku tabel strumieniowych w wykaziehive_metastore. - Wygenerowane kolumny, kolumny tożsamości i kolumny domyślne nie są obsługiwane.
Przykłady
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')