SKAPA DIREKTUPPSPELNINGSTABELL

Gäller för:markerad ja Databricks SQL markerad ja Databricks Runtime 13.3 LTS och senare

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion. Om du vill registrera dig för åtkomst fyller du i det här formuläret.

Skapar en strömmande tabell, en Delta-tabell med extra stöd för direktuppspelning eller inkrementell databearbetning.

Direktuppspelningstabeller stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Implementera en Delta Live Tables-pipeline med SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parametrar

  • UPPDATERA

    Om det anges uppdaterar tabellen med de senaste tillgängliga data från källorna som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras till nästa uppdatering.

  • OM INTE FINNS

    Om det anges och det redan finns en tabell med samma namn ignoreras -instruktionen.

    IF NOT EXISTS kan inte användas tillsammans med REFRESH, vilket innebär att CREATE OR REFRESH TABLE IF NOT EXISTS det inte är tillåtet.

  • Table_name

    Namnet på tabellen som ska skapas. Namnet får inte innehålla någon temporal specifikation. Om namnet inte är kvalificerat skapas tabellen i det aktuella schemat.

  • table_specification

    Den här valfria satsen definierar listan över kolumner, deras typer, egenskaper, beskrivningar och kolumnbegränsningar.

    Om du inte definierar kolumner i tabellschemat måste du ange AS query.

    • column_identifier

      Ett unikt namn för kolumnen.

      • column_type

        Anger kolumnens datatyp .

      • INTE NULL

        Om det anges accepterar NULL kolumnen inte värden.

      • KOMMENTAR column_comment

        En strängliteral som beskriver kolumnen.

      • column_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en primärnyckel eller sekundärnyckelbegränsning i kolumnen i en strömmande tabell. Begränsningar stöds inte för tabeller i hive_metastore katalogen.

      • BEGRÄNSNING expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | SLÄPP RAD } ]

        Lägger till datakvalitetsförväntningar i tabellen. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningstabellens händelselogg. En FAIL UPDATE förväntan gör att bearbetningen misslyckas när både tabellen skapas och tabellen uppdateras. En DROP ROW förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.

        expectation_expr kan bestå av literaler, kolumnidentifierare i tabellen och deterministiska, inbyggda SQL-funktioner eller operatorer förutom:

        Får inte heller expr innehålla någon underfråga.

      • table_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en informations primärnyckel eller informationsmässiga begränsningar för sekundärnyckel i en strömmande tabell. Viktiga begränsningar stöds inte för tabeller i hive_metastore katalogen.

  • table_clauses

    Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett uppdateringsschema för den nya tabellen. Varje undersats kan bara anges en gång.

    • PARTITIONERAD AV

      En valfri lista över kolumner i tabellen som tabellen ska partitioneras efter.

    • KOMMENTAR table_comment

      En STRING literal för att beskriva tabellen.

    • TBLPROPERTIES

      Du kan också ange en eller flera användardefinierade egenskaper.

    • SCHEMA [ UPPDATERA ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Om det tillhandahålls schemalägger du strömningstabellen eller den materialiserade vyn för att uppdatera sina data med det angivna cron-schemat för quartz . Endast time_zone_values accepteras. AT TIME ZONE LOCAL stöds inte. Om AT TIME ZONE den saknas används tidszonen för sessionen. Om AT TIME ZONE är frånvarande och sessionens tidszon inte har angetts utlöses ett fel. SCHEDULE är semantiskt likvärdigt med SCHEDULE REFRESH.

      Du kan inte använda syntaxen SCHEDULE i en pipelinedefinition för Delta Live Tables.

      Satsen SCHEDULE tillåts inte i ett CREATE OR REFRESH kommando. Schemat kan anges som en del av CREATE kommandot. Använd ALTER STREAMING TABLE för att ändra schemat för en strömningstabell när du har skapat den.

  • AS-fråga

    Den här satsen fyller i tabellen med hjälp av data från query. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet i STREAM valfri relation som du vill bearbeta stegvis. När du anger en query och en table_specification tillsammans måste tabellschemat som anges i table_specification innehålla alla kolumner som returneras av query, annars får du ett fel. Alla kolumner som anges i table_specification men inte returneras av query returvärden null när du frågar.

    Den här satsen krävs för strömmande tabeller som skapats i Databricks SQL, men krävs inte i Delta Live Tables. Om den här satsen inte anges i Delta Live Tables måste du referera till den här tabellen i ett APPLY CHANGES kommando i DLT-pipelinen. Se Ändra datainsamling med SQL i Delta Live Tables.

Skillnader mellan strömmande tabeller och andra tabeller

Direktuppspelningstabeller är tillståndskänsliga tabeller som endast är utformade för att hantera varje rad en gång när du bearbetar en växande datauppsättning. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tabeller bra för de flesta inmatningsarbetsbelastningar. Strömmande tabeller är optimala för pipelines som kräver data färskhet och låg svarstid. Strömningstabeller kan också vara användbara för omfattande skalningstransformeringar, eftersom resultaten kan beräknas stegvis när nya data anländer, vilket håller resultaten uppdaterade utan att helt omkomplera alla källdata med varje uppdatering. Strömmande tabeller är utformade för datakällor som endast är tilläggstabeller.

Strömmande tabeller accepterar ytterligare kommandon, till exempel REFRESH, som bearbetar de senaste data som är tillgängliga i de källor som anges i frågan. Ändringar i den angivna frågan återspeglas bara på nya data genom att anropa en REFRESH, inte tidigare bearbetade data. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL för att utföra en FULL REFRESH. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.

Begränsningar

  • Endast tabellägare kan uppdatera strömmande tabeller för att hämta de senaste data.

  • ALTER TABLE kommandon tillåts inte för strömmande tabeller. Tabellens definition och egenskaper bör ändras via -instruktionen ALTER STREAMING TABLE .

  • Frågor om tidsresor stöds inte.

  • Utveckla tabellschemat via DML-kommandon som INSERT INTO, och MERGE stöds inte.

  • Följande kommandon stöds inte i strömmande tabeller:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Deltadelning stöds inte.

  • Det går inte att byta namn på tabellen eller ändra ägaren.

  • Tabellbegränsningar som PRIMARY KEY och FOREIGN KEY stöds inte.

  • Genererade kolumner, identitetskolumner och standardkolumner stöds inte.

Exempel

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');