CREATE STREAMING TABLE

Gäller för:markerad ja Databricks SQL

Skapar en strömmande tabell, en Delta-tabell med extra stöd för direktuppspelning eller inkrementell databearbetning.

Direktuppspelningstabeller stöds endast i Lakeflow Spark Deklarativa pipelines och på Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Utveckla Lakeflow Spark Deklarativ pipelines-kod med SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Parameterar

  • REFRESH

    Om det anges uppdaterar tabellen med de senaste tillgängliga data från källorna som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras till nästa uppdatering. Uppdateringsåtgärden från CREATE OR REFRESH är helt deklarativ. Om ett uppdateringskommando inte anger alla metadata från den ursprungliga tabellskapandet tas de ospecificerade metadata bort.

  • OM INTE EXISTERAR

    Skapar streaming-tabellen om den inte finns. Om det redan finns en tabell med det här namnet ignoreras CREATE STREAMING TABLE-instruktionen.

    Du kan ange högst en av IF NOT EXISTS eller OR REFRESH.

  • table_name

    Namnet på tabellen som ska skapas. Namnet får inte innehålla en temporal specifikation eller alternativspecifikation. Om namnet inte är kvalificerat skapas tabellen i det aktuella schemat.

  • tabellspecifikation

    Den här valfria satsen definierar listan över kolumner, deras typer, egenskaper, beskrivningar och kolumnbegränsningar.

    Om du inte definierar kolumner i tabellschemat måste du ange AS query.

    • column_identifier

      Ett unikt namn för kolumnen.

      • kolumntyp

        Anger typen av data för kolumnen.

      • INTE NULL

        Om det anges accepterar kolumnen inte NULL värden.

      • KOMMENTAR column_comment

        En textsträng som beskriver kolumnen.

      • column_constraint

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en primärnyckel eller främmande nyckelbegränsning i kolumnen i en strömmande tabell. Begränsningar stöds inte för tabeller i hive_metastore katalogen.

      • MASK-villkor

        Lägger till en kolumnmaskfunktion för att anonymisera känsliga data. Alla efterföljande frågor från den kolumnen får resultatet av utvärderingen av funktionen över kolumnen i stället för kolumnens ursprungliga värde. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om värdet ska redigeras.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | TA BORT RAD } ]

        Lägger till datakvalitetsförväntningar i tabellen. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningstabellens händelselogg. En FAIL UPDATE förväntan gör att bearbetningen misslyckas både när du skapar tabellen och uppdaterar tabellen. En DROP ROW förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.

        expectation_expr kan bestå av literaler, kolumnidentifierare i tabellen och deterministiska, inbyggda SQL-funktioner eller operatorer förutom:

        Får inte heller expr innehålla någon underfråga.

      • tabellbegränsning

        Viktigt!

        Den här funktionen finns som allmänt tillgänglig förhandsversion.

        Lägger till en informationsprimärnyckel eller informationsreferensnyckelbegränsningar i en strömmande tabell. Viktiga begränsningar stöds inte för tabeller i hive_metastore katalogen.

  • tabell_klausuler

    Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett uppdateringsschema för den nya tabellen. Varje undersats får bara anges en gång.

    • PARTITIONERAD AV

      En valfri lista över kolumner i tabellen som tabellen ska partitioneras efter.

      Anteckning

      Flytande klustring ger en flexibel, optimerad lösning för klustring. Överväg att använda CLUSTER BY i stället för PARTITIONED BY för strömmande tabeller.

    • CLUSTER BY

      Ett valfritt villkor för att gruppera efter ett urval av kolumner. Använd automatisk flytande klustring med CLUSTER BY AUTO, och Databricks väljer intelligent klustringsnycklar för att optimera frågeprestanda. Se Använda flytande klustring för tabeller.

      Flytande klustring kan inte kombineras med PARTITIONED BY.

    • KOMMENTAR table_comment

      En STRING literal som beskriver tabellen.

    • STANDARD KOLLATION UTF8_BINARY

      Gäller för:check markerad ja Databricks SQL-kontroll markerad ja Databricks Runtime 17.1 och senare

      Tvingar in standardkollationeringen av strömningstabellen att vara UTF8_BINARY. Den här satsen är obligatorisk om schemat där tabellen skapas har en annan standardsortering än UTF8_BINARY. Standardsorteringen för strömningstabellen används som standardsortering inom query och för kolumntyper.

    • TBLPROPERTIES

      Du kan också ange en eller flera användardefinierade egenskaper.

      Använd den här inställningen för att ange den Lakeflow Spark deklarativa pipeline-körningskanal som används för att köra det här uttrycket. Ange värdet för egenskapen pipelines.channel till "PREVIEW" eller "CURRENT". Standardvärdet är "CURRENT". Mer information om Lakeflow Spark Declarativa Pipeline-kanaler finns under körningskanaler för Lakeflow Spark Deklarativa Pipelines.

    • schema

      Schemat kan antingen vara en SCHEDULE -instruktion eller en TRIGGER -instruktion.

      • SCHEDULE [ REFRESH ] schema_klausul

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Om du vill schemalägga en uppdatering som sker regelbundet använder du EVERY syntax. Om EVERY syntax anges uppdateras strömningstabellen eller den materialiserade vyn regelbundet med det angivna intervallet baserat på det angivna värdet, till exempel HOUR, HOURS, DAY, DAYS, WEEKeller WEEKS. I följande tabell visas godkända heltalsvärden för number.

          Tidsenhet Heltalsvärde
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= W <= 8

          Anteckning

          Singular- och pluralformerna i den inkluderade tidsenheten är semantiskt likvärdiga.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Så här schemalägger du en uppdatering med hjälp av ett Quartz-cronvärde . Giltiga time_zone_values godtas. AT TIME ZONE LOCAL stöds inte.

          Om AT TIME ZONE saknas används sessionens tidszon. Om AT TIME ZONE saknas och sessionens tidszon inte har angetts utlöses ett fel. SCHEDULE är semantiskt likvärdigt med SCHEDULE REFRESH.

        Schemat kan anges som en del av CREATE kommandot. Använd ALTER STREAMING TABLE eller kör kommandot CREATE OR REFRESH med SCHEDULE-satsen för att ändra schemat för en strömningstabell när den har skapats.

      • UTLÖSARE PÅ UPDATE [ HÖGST VARJE TRIGGER_INTERVAL ]

        Viktigt!

        Funktionen TRIGGER ON UPDATE finns i Beta.

        Du kan också ange att tabellen ska uppdateras när en överordnad datakälla uppdateras, högst en gång i minuten. Ange ett värde för för AT MOST EVERY att kräva minst en minsta tid mellan uppdateringarna.

        De överordnade datakällorna måste vara antingen externa eller hanterade Delta-tabeller (inklusive materialiserade vyer eller strömmande tabeller) eller hanterade vyer vars beroenden är begränsade till tabelltyper som stöds.

        Aktivering av filhändelser kan göra utlösare mer högpresterande och ökar vissa av gränserna för utlösaruppdateringar.

        trigger_interval är en INTERVAL-instruktion som är minst 1 minut.

        TRIGGER ON UPDATE har följande begränsningar

        • Högst 10 överordnade datakällor per strömmande tabell när du använder TRIGGER ON UPDATE.
        • Högst 1 000 strömmande tabeller eller materialiserade vyer kan anges med TRIGGER ON UPDATE.
        • AT MOST EVERY Satsen är som standard 1 minut och får inte vara mindre än 1 minut.
  • MED ROW FILTER-klausul

    Lägger till en radfilterfunktion i tabellen. Alla efterföljande frågor från tabellen tar emot en delmängd av de rader där funktionen utvärderas till boolesk TRUE. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om vissa rader ska filtreras.

  • AS-fråga

    Den här satsen fyller i tabellen med hjälp av data från query. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet i STREAM valfri relation som du vill bearbeta stegvis. När du anger en query och en table_specification tillsammans måste tabellschemat som anges i table_specification innehålla alla kolumner som returneras av query, annars får du ett fel. Alla kolumner som anges i table_specification men som inte returneras av query returnerar null värden när du frågar.

Skillnader mellan strömmande tabeller och andra tabeller

Direktuppspelningstabeller är tillståndskänsliga tabeller som endast är utformade för att hantera varje rad en gång när du bearbetar en växande datauppsättning. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tabeller bra för de flesta inmatningsarbetsbelastningar. Strömmande tabeller är optimala för pipelines som kräver att datan är färsk och låg latens. Strömningstabeller kan också vara användbara för omfattande skalningstransformeringar, eftersom resultaten kan beräknas stegvis när nya data anländer, vilket håller resultaten uppdaterade utan att helt omkomplera alla källdata med varje uppdatering. Strömmande tabeller är utformade för datakällor som endast är tillägg.

Strömmande tabeller accepterar ytterligare kommandon, till exempel REFRESH, som bearbetar de senaste data som är tillgängliga i de källor som anges i frågan. Ändringar i den angivna frågan återspeglas bara på nya data genom att anropa en REFRESH, inte tidigare behandlade data. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL för att utföra en FULL REFRESH. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.

Radfilter och kolumnmasker

Med radfilter kan du ange en funktion som tillämpas som ett filter när en tabellgenomsökning hämtar rader. Dessa filter säkerställer att efterföljande frågor endast returnerar rader som filterpredikatet utvärderas till sant för.

Med kolumnmasker kan du maskera en kolumns värden när en tabellgenomsökning hämtar rader. Alla framtida frågor som rör den kolumnen får resultatet av utvärderingen av funktionen över kolumnen och ersätter kolumnens ursprungliga värde.

Mer information om hur du använder radfilter och kolumnmasker finns i Radfilter och kolumnmasker.

Hantera radfilter och kolumnmasker

Radfilter och kolumnmasker i strömmande tabeller bör läggas till, uppdateras eller tas bort via CREATE OR REFRESH-instruktionen.

Funktionssätt

  • Uppdatera som definierare: När -instruktionen CREATE OR REFRESH eller REFRESH uppdaterar en strömmande tabell körs radfilterfunktionerna med definierarens rättigheter (som tabellägare). Det innebär att tabelluppdateringen använder säkerhetskontexten för den användare som skapade strömningstabellen.
  • Fråga: De flesta filter körs med definierarens rättigheter, men funktioner som kontrollerar användarkontexten (till exempel CURRENT_USER och IS_MEMBER) är undantag. Dessa funktioner körs av den som anropar dem. Den här metoden tillämpar användarspecifika datasäkerhets- och åtkomstkontroller baserat på den aktuella användarens kontext.

Överskådlighet

Använd DESCRIBE EXTENDED, INFORMATION_SCHEMAeller Katalogutforskaren för att undersöka befintliga radfilter och kolumnmasker som gäller för en viss strömmande tabell. Med den här funktionen kan användare revidera och granska dataåtkomst och skyddsåtgärder för strömmande tabeller.

Begränsningar

  • Endast tabellägare kan uppdatera strömmande tabeller för att hämta de senaste data.
  • ALTER TABLE kommandon tillåts inte för strömmande tabeller. Tabellens definition och egenskaper bör ändras via instruktionen CREATE OR REFRESH eller ALTER STREAMING TABLE.
  • Det går inte att utveckla tabellschemat via DML-kommandon som INSERT INTOoch MERGE.
  • Följande kommandon stöds inte i strömmande tabeller:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing stöds inte.
  • Det går inte att byta namn på tabellen eller ändra ägaren.
  • Tabellbegränsningar som PRIMARY KEY och FOREIGN KEY stöds inte för strömmande tabeller i hive_metastore katalogen.
  • Genererade kolumner, identitetskolumner och standardkolumner stöds inte.

Exempel

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')