Dokumentacja języka SQL usługi Delta Live Tables
Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania SQL usługi Delta Live Tables.
- Aby uzyskać informacje na temat interfejsu API języka Python, zobacz dokumentację języka Python tabel delta live tables.
- Aby uzyskać więcej informacji na temat poleceń SQL, zobacz Dokumentacja języka SQL.
W zapytaniach SQL można używać funkcji zdefiniowanych przez użytkownika (UDF) języka Python, ale przed wywołaniem ich w plikach źródłowych SQL należy zdefiniować te funkcje zdefiniowane przez użytkownika. Zobacz Funkcje skalarne zdefiniowane przez użytkownika — Python.
Ograniczenia
Klauzula nie jest obsługiwana PIVOT
. Operacja pivot
na platformie Spark wymaga chętnego ładowania danych wejściowych w celu obliczenia schematu danych wyjściowych. Ta funkcja nie jest obsługiwana w tabelach delta live.
Tworzenie tabel delta live zmaterializowanego widoku lub tabeli przesyłania strumieniowego
Ta sama podstawowa składnia SQL jest używana podczas deklarowania tabeli przesyłania strumieniowego lub zmaterializowanego widoku (nazywanego LIVE TABLE
również ).
Tabele przesyłania strumieniowego można zadeklarować tylko przy użyciu zapytań odczytywanych względem źródła przesyłania strumieniowego. Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania plików przesyłanych strumieniowo z magazynu obiektów w chmurze. Zobacz Auto loader SQL syntax (Składnia SQL modułu automatycznego ładowania).
Należy uwzględnić STREAM()
funkcję wokół nazwy zestawu danych podczas określania innych tabel lub widoków w potoku jako źródła przesyłania strumieniowego.
Poniżej opisano składnię deklarowania zmaterializowanych widoków i tabel przesyłania strumieniowego za pomocą języka SQL:
CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
AS select_statement
Tworzenie widoku tabel na żywo delty
Poniżej opisano składnię deklarowania widoków za pomocą języka SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Składnia SQL automatycznego modułu ładującego
Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map()
Za pomocą funkcji można przekazać dowolną cloud_files()
liczbę opcji do metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Aby uzyskać szczegółowe informacje na temat formatów i opcji obsługi, zobacz Opcje formatu pliku.
Przykład: Definiowanie tabel
Zestaw danych można utworzyć, odczytując z zewnętrznego źródła danych lub z zestawów danych zdefiniowanych w potoku. Aby odczytać z wewnętrznego zestawu danych, należy LIVE
wstępnie wpisać słowo kluczowe na nazwę zestawu danych. Poniższy przykład definiuje dwa różne zestawy danych: tabelę o nazwie taxi_raw
, która przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data
, która przyjmuje taxi_raw
tabelę jako dane wejściowe:
CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Przykład: odczyt ze źródła przesyłania strumieniowego
Aby odczytać dane ze źródła przesyłania strumieniowego, na przykład moduł automatycznego ładowania lub wewnętrzny zestaw danych, zdefiniuj tabelę STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą tabel delta live.
Kontrolowanie sposobu materializacji tabel
Tabele oferują również dodatkową kontrolę nad ich materializacją:
- Określ sposób partycjonowania tabel przy użyciu polecenia
PARTITIONED BY
. Możesz użyć partycjonowania, aby przyspieszyć wykonywanie zapytań. - Właściwości tabeli można ustawić przy użyciu polecenia
TBLPROPERTIES
. Zobacz Właściwości tabeli Tabele na żywo funkcji Delta. - Ustaw lokalizację magazynu przy użyciu
LOCATION
ustawienia . Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśliLOCATION
nie są ustawione. - W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określanie schematu i kolumn partycji.
Uwaga
W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca umożliwienie usłudze Delta Live Tables kontrolowania organizacji danych. Jeśli nie spodziewasz się, że tabela przekroczy terabajt, zazwyczaj nie należy określać kolumn partycji.
Przykład: określanie schematu i kolumn partycji
Opcjonalnie można określić schemat podczas definiowania tabeli. Poniższy przykład określa schemat tabeli docelowej, w tym użycie kolumn wygenerowanych przez usługę Delta Lake i zdefiniowanie kolumn partycji dla tabeli:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Domyślnie tabele na żywo delty wywnioskują schemat z table
definicji, jeśli nie określisz schematu.
Przykład: Definiowanie ograniczeń tabeli
Uwaga
Obsługa ograniczeń tabeli jest dostępna w publicznej wersji zapoznawczej. Aby zdefiniować ograniczenia tabeli, potok musi być potokiem obsługującym wykaz aparatu Unity i skonfigurowanym do korzystania z kanału preview
.
Podczas określania schematu można zdefiniować klucz podstawowy i klucze obce. Ograniczenia są informacyjne i nie są wymuszane. W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Ustawianie wartości konfiguracji dla tabeli lub widoku
Służy SET
do określania wartości konfiguracji dla tabeli lub widoku, w tym konfiguracji platformy Spark. Każda tabela lub widok zdefiniowany w notesie po SET
instrukcji ma dostęp do zdefiniowanej wartości. Wszystkie konfiguracje platformy Spark określone przy użyciu SET
instrukcji są używane podczas wykonywania zapytania Spark dla dowolnej tabeli lub widoku zgodnie z instrukcją SET. Aby odczytać wartość konfiguracji w zapytaniu, użyj składni ${}
interpolacji ciągów . W poniższym przykładzie ustawiono wartość konfiguracji platformy Spark o nazwie startDate
i użyto tej wartości w zapytaniu:
SET startDate='2020-01-01';
CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Aby określić wiele wartości konfiguracji, użyj oddzielnej SET
instrukcji dla każdej wartości.
Właściwości sql
TWORZENIE TABELI LUB WIDOKU |
---|
TEMPORARY Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Klauzula TEMPORARY instruuje tabele delta Live Tables, aby utworzyć tabelę dostępną dla potoku, ale nie należy uzyskiwać dostępu poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który go tworzy, a nie tylko pojedynczą aktualizację. |
STREAMING Utwórz tabelę, która odczytuje wejściowy zestaw danych jako strumień. Wejściowy zestaw danych musi być źródłem danych przesyłania strumieniowego, na przykład modułem automatycznego ładowania lub tabelą STREAMING . |
PARTITIONED BY Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli. |
LOCATION Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system będzie domyślnie ustawiony na lokalizację przechowywania potoku. |
COMMENT Opcjonalny opis tabeli. |
column_constraint Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w kolumnie. |
table_constraint Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w tabeli. |
TBLPROPERTIES Opcjonalna lista właściwości tabeli. |
select_statement Zapytanie Delta Live Tables, które definiuje zestaw danych dla tabeli. |
CONSTRAINT, klauzula |
---|
EXPECT expectation_name Zdefiniuj ograniczenie expectation_name dotyczące jakości danych. Jeśli ON VIOLATION ograniczenie nie jest zdefiniowane, dodaj wiersze naruszające ograniczenie do docelowego zestawu danych. |
ON VIOLATION Opcjonalna akcja do wykonania dla wierszy, które zakończyły się niepowodzeniem: * FAIL UPDATE : Natychmiastowe zatrzymywanie wykonywania potoku.* DROP ROW : Upuść rekord i kontynuuj przetwarzanie. |
Zmienianie przechwytywania danych za pomocą bazy danych SQL w tabelach delta live
Użyj instrukcji APPLY CHANGES INTO
, aby użyć funkcji CDC tabel delta live, zgodnie z opisem w następujących artykułach:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Definiujesz ograniczenia dotyczące jakości danych dla APPLY CHANGES
obiektu docelowego przy użyciu tej samej CONSTRAINT
klauzuli co zapytania inneAPPLY CHANGES
niż zapytania. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.
Uwaga
Domyślne zachowanie dla INSERT
zdarzeń i UPDATE
dotyczy upsert zdarzeń CDC ze źródła: zaktualizować wszystkie wiersze w tabeli docelowej, które są zgodne z określonymi kluczami lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE
można określić za pomocą APPLY AS DELETE WHEN
warunku.
Ważne
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu APPLY CHANGES
tabeli docelowej należy również uwzględnić __START_AT
kolumny i __END_AT
o tym samym typie sequence_by
danych co pole.
Zobacz Stosowanie zmian interfejsu API: upraszczanie przechwytywania danych zmian w tabelach delta live.
Klauzule |
---|
KEYS Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Ta klauzula jest wymagana. |
IGNORE NULL UPDATES Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone ignoruj aktualizacje o wartości NULL, kolumny z wartością null zachowają istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null .Ta klauzula jest opcjonalna. Wartością domyślną jest zastąpienie istniejących kolumn wartościami null . |
APPLY AS DELETE WHEN Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako grób w bazowej tabeli delty, a widok jest tworzony w magazynie metadanych, który filtruje te grobowce. Interwał przechowywania można skonfigurować za pomocą poleceniapipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.Ta klauzula jest opcjonalna. |
APPLY AS TRUNCATE WHEN Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE . Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.Klauzula jest obsługiwana APPLY AS TRUNCATE WHEN tylko dla typu SCD 1. Typ SCD 2 nie obsługuje obcinania.Ta klauzula jest opcjonalna. |
SEQUENCE BY Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Funkcja Delta Live Tables używa tej sekwencjonowania w celu obsługi zdarzeń zmiany, które docierają poza kolejność. Ta klauzula jest wymagana. |
COLUMNS Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Można: * Określ pełną listę kolumn do uwzględnienia: COLUMNS (userId, name, city) .* Określ listę kolumn do wykluczenia: COLUMNS * EXCEPT (operation, sequenceNum) Ta klauzula jest opcjonalna. Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy klauzula nie zostanie określona COLUMNS . |
STORED AS Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ta klauzula jest opcjonalna. Wartość domyślna to SCD typ 1. |
TRACK HISTORY ON Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Można: * Określ pełną listę kolumn do śledzenia: COLUMNS (userId, name, city) .* Określ listę kolumn, które mają być wykluczone ze śledzenia: COLUMNS * EXCEPT (operation, sequenceNum) Ta klauzula jest opcjonalna. Wartość domyślna to śledzenie historii dla wszystkich kolumn wyjściowych, gdy istnieją jakiekolwiek zmiany, równoważne . TRACK HISTORY ON * |