TWORZENIE TABELI PRZESYŁANIA STRUMIENIOWEGO

Dotyczy:zaznacz pole wyboru oznaczone jako tak Databricks SQL zaznacz pole wyboru oznaczone jako tak Databricks Runtime 13.3 LTS i nowsze

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej. Aby zarejestrować się w celu uzyskania dostępu, wypełnij ten formularz.

Tworzy tabelę przesyłania strumieniowego, tabelę delty z dodatkową obsługą przesyłania strumieniowego lub przyrostowego przetwarzania danych.

Tabele przesyłania strumieniowego są obsługiwane tylko w tabelach delta live i w usłudze Databricks SQL z wykazem aparatu Unity. Uruchomienie tego polecenia w obsługiwanym środowisku Databricks Runtime oblicza tylko składnię. Zobacz Implementowanie potoku delty tabel na żywo za pomocą języka SQL.

Składnia

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parametry

  • REFRESH

    Jeśli zostanie określona, odświeża tabelę z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu. Tylko nowe dane, które docierają przed rozpoczęciem zapytania, są przetwarzane. Nowe dane dodawane do źródeł podczas wykonywania polecenia są ignorowane do następnego odświeżania.

  • JEŚLI NIE ISTNIEJE

    Jeśli określono i tabela o tej samej nazwie już istnieje, instrukcja jest ignorowana.

    IF NOT EXISTS nie można używać razem z parametrem REFRESH, co oznacza CREATE OR REFRESH TABLE IF NOT EXISTS , że nie jest dozwolone.

  • Nazwa_tabeli

    Nazwa tabeli do utworzenia. Nazwa nie może zawierać specyfikacji czasowej. Jeśli nazwa nie jest kwalifikowana, tabela zostanie utworzona w bieżącym schemacie.

  • table_specification

    Ta klauzula opcjonalna definiuje listę kolumn, ich typów, właściwości, opisów i ograniczeń kolumn.

    Jeśli nie zdefiniujesz kolumn w schemacie tabeli, musisz określić wartość AS query.

    • column_identifier

      Unikatowa nazwa kolumny.

      • Column_type

        Określa typ danych kolumny.

      • NOT NULL

        Jeśli określono kolumnę, nie akceptuje NULL wartości.

      • COLUMN_COMMENT KOMENTARZ

        Literał ciągu opisujący kolumnę.

      • column_constraint

        Ważne

        Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

        Dodaje ograniczenie klucza podstawowego lub klucza obcego do kolumny w tabeli przesyłania strumieniowego. Ograniczenia nie są obsługiwane w przypadku tabel w wykazie hive_metastore .

      • OGRANICZENIE EXPECTATION_NAME OCZEKIWANO (expectation_expr) [ PRZY NARUSZENIU { NIEPOWODZENIE AKTUALIZACJI | DROP ROW } ]

        Dodaje oczekiwania dotyczące jakości danych do tabeli. Te oczekiwania dotyczące jakości danych można śledzić w czasie i uzyskiwać do nich dostęp za pośrednictwem dziennika zdarzeń tabeli przesyłania strumieniowego. Oczekiwanie FAIL UPDATE powoduje niepowodzenie przetwarzania podczas tworzenia tabeli, a także odświeżania tabeli. Oczekiwanie DROP ROW powoduje porzucenie całego wiersza, jeśli oczekiwanie nie zostanie spełnione.

        expectation_expr Może składać się z literałów, identyfikatorów kolumn w tabeli oraz deterministycznych, wbudowanych funkcji LUB operatorów SQL, z wyjątkiem:

        Ponadto expr nie może zawierać żadnego podzapytania.

      • table_constraint

        Ważne

        Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

        Dodaje informacyjne podstawowe lub informacyjne ograniczenia klucza obcego do tabeli przesyłania strumieniowego. Ograniczenia klucza nie są obsługiwane w przypadku tabel w wykazie hive_metastore .

  • table_clauses

    Opcjonalnie określ partycjonowanie, komentarze, właściwości zdefiniowane przez użytkownika i harmonogram odświeżania nowej tabeli. Każda klauzula podrzędna może być określona tylko raz.

    • PARTYCJONOWANE PRZEZ

      Opcjonalna lista kolumn tabeli do partycjonowania tabeli według.

    • TABLE_COMMENT KOMENTARZ

      Literał STRING opisujący tabelę.

    • TBLPROPERTIES

      Opcjonalnie ustawia co najmniej jedną właściwość zdefiniowaną przez użytkownika.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Jeśli zostanie podana, zaplanuje tabelę przesyłania strumieniowego lub zmaterializowany widok, aby odświeżyć dane przy użyciu danego harmonogramu kronu kwarcowego. Akceptowane są tylko time_zone_values . AT TIME ZONE LOCAL nie jest obsługiwana. Jeśli AT TIME ZONE jest nieobecny, używana jest strefa czasowa sesji. Jeśli AT TIME ZONE jest nieobecny, a strefa czasowa sesji nie jest ustawiona, zostanie zgłoszony błąd. SCHEDULEjest semantycznie równoważne .SCHEDULE REFRESH

      Nie można użyć SCHEDULE składni w definicji potoku delta Live Tables.

      Klauzula SCHEDULE nie jest dozwolona w poleceniu CREATE OR REFRESH . Harmonogram można podać w ramach CREATE polecenia . Użyj funkcji ALTER STREAMING TABLE , aby zmienić harmonogram tabeli przesyłania strumieniowego po utworzeniu.

  • Zapytanie AS

    Ta klauzula wypełnia tabelę przy użyciu danych z query. To zapytanie musi być zapytaniem przesyłanym strumieniowo. Można to osiągnąć, dodając STREAM słowo kluczowe do dowolnej relacji, którą chcesz przetwarzać przyrostowo. Po określeniu query elementu i table_specification razem schemat tabeli określony w table_specification elemecie musi zawierać wszystkie kolumny zwrócone przez queryelement , w przeciwnym razie zostanie wyświetlony błąd. Wszystkie kolumny określone w elemecie table_specification , ale nie są zwracane przez query wartości zwracane null podczas wykonywania zapytania.

    Ta klauzula jest wymagana w przypadku tabel przesyłania strumieniowego utworzonych w usłudze Databricks SQL, ale nie jest wymagana w tabelach delta live. Jeśli ta klauzula nie jest podana w tabelach delta Live Tables, należy odwołać się do tej tabeli w APPLY CHANGES poleceniu w potoku DLT. Zobacz Przechwytywanie zmian danych za pomocą języka SQL w tabelach delta live.

Różnice między tabelami przesyłania strumieniowego a innymi tabelami

Tabele przesyłania strumieniowego to tabele stanowe, przeznaczone do obsługi każdego wiersza tylko raz podczas przetwarzania rosnącego zestawu danych. Ponieważ większość zestawów danych stale rośnie wraz z upływem czasu, tabele przesyłania strumieniowego są dobre dla większości obciążeń pozyskiwania. Tabele przesyłania strumieniowego są optymalne dla potoków, które wymagają świeżości danych i małych opóźnień. Tabele przesyłania strumieniowego mogą być również przydatne w przypadku transformacji na dużą skalę, ponieważ wyniki mogą być obliczane przyrostowo w miarę nadejścia nowych danych, zapewniając aktualność wyników bez konieczności pełnej ponownej kompilacji wszystkich danych źródłowych przy każdej aktualizacji. Tabele przesyłania strumieniowego są przeznaczone dla źródeł danych, które są tylko dołączane.

Tabele przesyłania strumieniowego akceptują dodatkowe polecenia, takie jak REFRESH, które przetwarzają najnowsze dane dostępne w źródłach podanych w zapytaniu. Zmiany w podanym zapytaniu są odzwierciedlane tylko na nowych danych przez wywołanie REFRESHelementu , który nie został wcześniej przetworzony. Aby zastosować zmiany w istniejących danych, należy wykonać REFRESH TABLE <table_name> FULL polecenie , aby wykonać polecenie FULL REFRESH. Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.

Ograniczenia

  • Tylko właściciele tabel mogą odświeżać tabele przesyłania strumieniowego, aby uzyskać najnowsze dane.

  • ALTER TABLE polecenia są niedozwolone w tabelach przesyłania strumieniowego. Definicja i właściwości tabeli powinny zostać zmienione za pomocą instrukcji ALTER STREAMING TABLE .

  • Zapytania dotyczące podróży w czasie nie są obsługiwane.

  • Ewolucja schematu tabeli za pomocą poleceń DML, takich jak INSERT INTO, i MERGE nie jest obsługiwana.

  • Następujące polecenia nie są obsługiwane w tabelach przesyłania strumieniowego:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Udostępnianie różnicowe nie jest obsługiwane.

  • Zmiana nazwy tabeli lub zmiana właściciela nie jest obsługiwana.

  • Ograniczenia tabeli, takie jak PRIMARY KEY i FOREIGN KEY nie są obsługiwane.

  • Wygenerowane kolumny, kolumny tożsamości i kolumny domyślne nie są obsługiwane.

Przykłady

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');