Dokumentacja języka SQL usługi Delta Live Tables

Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania SQL usługi Delta Live Tables.

  • Aby uzyskać informacje na temat interfejsu API języka Python, zobacz dokumentację języka Python tabel delta live tables.
  • Aby uzyskać więcej informacji na temat poleceń SQL, zobacz Dokumentacja języka SQL.

W zapytaniach SQL można używać funkcji zdefiniowanych przez użytkownika (UDF) języka Python, ale przed wywołaniem ich w plikach źródłowych SQL należy zdefiniować te funkcje zdefiniowane przez użytkownika. Zobacz Funkcje skalarne zdefiniowane przez użytkownika — Python.

Ograniczenia

Klauzula nie jest obsługiwana PIVOT . Operacja pivot na platformie Spark wymaga chętnego ładowania danych wejściowych w celu obliczenia schematu danych wyjściowych. Ta funkcja nie jest obsługiwana w tabelach delta live.

Tworzenie tabel delta live zmaterializowanego widoku lub tabeli przesyłania strumieniowego

Ta sama podstawowa składnia SQL jest używana podczas deklarowania tabeli przesyłania strumieniowego lub zmaterializowanego widoku (nazywanego LIVE TABLErównież ).

Tabele przesyłania strumieniowego można zadeklarować tylko przy użyciu zapytań odczytywanych względem źródła przesyłania strumieniowego. Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania plików przesyłanych strumieniowo z magazynu obiektów w chmurze. Zobacz Auto loader SQL syntax (Składnia SQL modułu automatycznego ładowania).

Należy uwzględnić STREAM() funkcję wokół nazwy zestawu danych podczas określania innych tabel lub widoków w potoku jako źródła przesyłania strumieniowego.

Poniżej opisano składnię deklarowania zmaterializowanych widoków i tabel przesyłania strumieniowego za pomocą języka SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Tworzenie widoku tabel na żywo delty

Poniżej opisano składnię deklarowania widoków za pomocą języka SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Składnia SQL automatycznego modułu ładującego

Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map() Za pomocą funkcji można przekazać dowolną cloud_files() liczbę opcji do metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Aby uzyskać szczegółowe informacje na temat formatów i opcji obsługi, zobacz Opcje formatu pliku.

Przykład: Definiowanie tabel

Zestaw danych można utworzyć, odczytując z zewnętrznego źródła danych lub z zestawów danych zdefiniowanych w potoku. Aby odczytać z wewnętrznego zestawu danych, należy LIVE wstępnie wpisać słowo kluczowe na nazwę zestawu danych. Poniższy przykład definiuje dwa różne zestawy danych: tabelę o nazwie taxi_raw , która przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data , która przyjmuje taxi_raw tabelę jako dane wejściowe:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Przykład: odczyt ze źródła przesyłania strumieniowego

Aby odczytać dane ze źródła przesyłania strumieniowego, na przykład moduł automatycznego ładowania lub wewnętrzny zestaw danych, zdefiniuj tabelę STREAMING :

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą tabel delta live.

Kontrolowanie sposobu materializacji tabel

Tabele oferują również dodatkową kontrolę nad ich materializacją:

  • Określ sposób partycjonowania tabel przy użyciu polecenia PARTITIONED BY. Możesz użyć partycjonowania, aby przyspieszyć wykonywanie zapytań.
  • Właściwości tabeli można ustawić przy użyciu polecenia TBLPROPERTIES. Zobacz Właściwości tabeli Tabele na żywo funkcji Delta.
  • Ustaw lokalizację magazynu przy użyciu LOCATION ustawienia . Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśli LOCATION nie są ustawione.
  • W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określanie schematu i kolumn partycji.

Uwaga

W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca umożliwienie usłudze Delta Live Tables kontrolowania organizacji danych. Jeśli nie spodziewasz się, że tabela przekroczy terabajt, zazwyczaj nie należy określać kolumn partycji.

Przykład: określanie schematu i kolumn partycji

Opcjonalnie można określić schemat podczas definiowania tabeli. Poniższy przykład określa schemat tabeli docelowej, w tym użycie kolumn wygenerowanych przez usługę Delta Lake i zdefiniowanie kolumn partycji dla tabeli:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Domyślnie tabele na żywo delty wywnioskują schemat z table definicji, jeśli nie określisz schematu.

Przykład: Definiowanie ograniczeń tabeli

Uwaga

Obsługa ograniczeń tabeli jest dostępna w publicznej wersji zapoznawczej. Aby zdefiniować ograniczenia tabeli, potok musi być potokiem obsługującym wykaz aparatu Unity i skonfigurowanym do korzystania z kanału preview .

Podczas określania schematu można zdefiniować klucz podstawowy i klucze obce. Ograniczenia są informacyjne i nie są wymuszane. W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Ustawianie wartości konfiguracji dla tabeli lub widoku

Służy SET do określania wartości konfiguracji dla tabeli lub widoku, w tym konfiguracji platformy Spark. Każda tabela lub widok zdefiniowany w notesie po SET instrukcji ma dostęp do zdefiniowanej wartości. Wszystkie konfiguracje platformy Spark określone przy użyciu SET instrukcji są używane podczas wykonywania zapytania Spark dla dowolnej tabeli lub widoku zgodnie z instrukcją SET. Aby odczytać wartość konfiguracji w zapytaniu, użyj składni ${}interpolacji ciągów . W poniższym przykładzie ustawiono wartość konfiguracji platformy Spark o nazwie startDate i użyto tej wartości w zapytaniu:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Aby określić wiele wartości konfiguracji, użyj oddzielnej SET instrukcji dla każdej wartości.

Właściwości sql

TWORZENIE TABELI LUB WIDOKU
TEMPORARY

Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Klauzula TEMPORARY instruuje tabele delta Live Tables, aby utworzyć tabelę dostępną dla potoku, ale nie należy uzyskiwać dostępu poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który go tworzy, a nie tylko pojedynczą aktualizację.
STREAMING

Utwórz tabelę, która odczytuje wejściowy zestaw danych jako strumień. Wejściowy zestaw danych musi być źródłem danych przesyłania strumieniowego, na przykład modułem automatycznego ładowania lub tabelą STREAMING .
PARTITIONED BY

Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
LOCATION

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system będzie domyślnie ustawiony na lokalizację przechowywania potoku.
COMMENT

Opcjonalny opis tabeli.
column_constraint

Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w kolumnie.
table_constraint

Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w tabeli.
TBLPROPERTIES

Opcjonalna lista właściwości tabeli.
select_statement

Zapytanie Delta Live Tables, które definiuje zestaw danych dla tabeli.
CONSTRAINT, klauzula
EXPECT expectation_name

Zdefiniuj ograniczenie expectation_namedotyczące jakości danych. Jeśli ON VIOLATION ograniczenie nie jest zdefiniowane, dodaj wiersze naruszające ograniczenie do docelowego zestawu danych.
ON VIOLATION

Opcjonalna akcja do wykonania dla wierszy, które zakończyły się niepowodzeniem:

* FAIL UPDATE: Natychmiastowe zatrzymywanie wykonywania potoku.
* DROP ROW: Upuść rekord i kontynuuj przetwarzanie.

Zmienianie przechwytywania danych za pomocą bazy danych SQL w tabelach delta live

Użyj instrukcji APPLY CHANGES INTO , aby użyć funkcji CDC tabel delta live, zgodnie z opisem w następujących artykułach:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Definiujesz ograniczenia dotyczące jakości danych dla APPLY CHANGES obiektu docelowego przy użyciu tej samej CONSTRAINT klauzuli co zapytania inneAPPLY CHANGES niż zapytania. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.

Uwaga

Domyślne zachowanie dla INSERT zdarzeń i UPDATE dotyczy upsert zdarzeń CDC ze źródła: zaktualizować wszystkie wiersze w tabeli docelowej, które są zgodne z określonymi kluczami lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić za pomocą APPLY AS DELETE WHEN warunku.

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu APPLY CHANGES tabeli docelowej należy również uwzględnić __START_AT kolumny i __END_AT o tym samym typie sequence_by danych co pole.

Zobacz Stosowanie zmian interfejsu API: upraszczanie przechwytywania danych zmian w tabelach delta live.

Klauzule
KEYS

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Ta klauzula jest wymagana.
IGNORE NULL UPDATES

Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone ignoruj aktualizacje o wartości NULL, kolumny z wartością null zachowają istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null.

Ta klauzula jest opcjonalna.

Wartością domyślną jest zastąpienie istniejących kolumn wartościami null .
APPLY AS DELETE WHEN

Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako grób w bazowej tabeli delty, a widok jest tworzony w magazynie metadanych, który filtruje te grobowce. Interwał przechowywania można skonfigurować za pomocą polecenia
pipelines.cdc.tombstoneGCThresholdInSecondswłaściwość tabeli.

Ta klauzula jest opcjonalna.
APPLY AS TRUNCATE WHEN

Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.

Klauzula jest obsługiwana APPLY AS TRUNCATE WHEN tylko dla typu SCD 1. Typ SCD 2 nie obsługuje obcinania.

Ta klauzula jest opcjonalna.
SEQUENCE BY

Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Funkcja Delta Live Tables używa tej sekwencjonowania w celu obsługi zdarzeń zmiany, które docierają poza kolejność.

Ta klauzula jest wymagana.
COLUMNS

Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Można:

* Określ pełną listę kolumn do uwzględnienia: COLUMNS (userId, name, city).
* Określ listę kolumn do wykluczenia: COLUMNS * EXCEPT (operation, sequenceNum)

Ta klauzula jest opcjonalna.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy klauzula nie zostanie określona COLUMNS .
STORED AS

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
TRACK HISTORY ON

Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Można:

* Określ pełną listę kolumn do śledzenia: COLUMNS (userId, name, city).
* Określ listę kolumn, które mają być wykluczone ze śledzenia: COLUMNS * EXCEPT (operation, sequenceNum)

Ta klauzula jest opcjonalna. Wartość domyślna to śledzenie historii dla wszystkich kolumn wyjściowych, gdy istnieją jakiekolwiek zmiany, równoważne .TRACK HISTORY ON *