Sql-språkreferens för Delta Live Tables

Den här artikeln innehåller information om SQL-programmeringsgränssnittet för Delta Live Tables.

  • Information om Python-API:et finns i python-språkreferensen Delta Live Tables.
  • Mer information om SQL-kommandon finns i SQL-språkreferens.

Du kan använda användardefinierade Python-funktioner (UDF: er) i dina SQL-frågor, men du måste definiera dessa UDF:er i Python-filer innan du anropar dem i SQL-källfiler. Se Användardefinierade skalärfunktioner – Python.

Begränsningar

Satsen PIVOT stöds inte. Åtgärden pivot i Spark kräver ivrig inläsning av indata för att beräkna schemat för utdata. Den här funktionen stöds inte i Delta Live Tables.

Skapa en materialiserad vy eller en strömmande tabell för Delta Live Tables

Du använder samma grundläggande SQL-syntax när du deklarerar antingen en strömmande tabell eller en materialiserad vy (kallas även för ).LIVE TABLE

Du kan bara deklarera strömmande tabeller med hjälp av frågor som läse mot en strömmande källa. Databricks rekommenderar att du använder Auto Loader för strömmande inmatning av filer från molnobjektlagring. Se SQL-syntax för automatisk inläsning.

Du måste inkludera STREAM() funktionen runt ett datauppsättningsnamn när du anger andra tabeller eller vyer i din pipeline som en strömmande källa.

Följande beskriver syntaxen för att deklarera materialiserade vyer och strömmande tabeller med SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Skapa en Delta Live Tables-vy

Följande beskriver syntaxen för att deklarera vyer med SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-syntax för automatisk inläsning

Följande beskriver syntaxen för att arbeta med automatisk inläsning i SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Du kan använda formatalternativ som stöds med Auto Loader. Med hjälp av map() funktionen kan du skicka valfritt antal alternativ till cloud_files() metoden. Alternativen är nyckel/värde-par, där nycklar och värden är strängar. Mer information om stödformat och alternativ finns i Alternativ för filformat.

Exempel: Definiera tabeller

Du kan skapa en datauppsättning genom att läsa från en extern datakälla eller från datauppsättningar som definierats i en pipeline. Om du vill läsa från en intern datauppsättning förbereder du nyckelordet LIVE till datamängdens namn. I följande exempel definieras två olika datauppsättningar: en tabell med namnet taxi_raw som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data som tar taxi_raw tabellen som indata:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exempel: Läsa från en strömmande källa

Om du vill läsa data från en strömmande källa, till exempel Automatisk inläsare eller en intern datauppsättning, definierar du en STREAMING tabell:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Mer information om strömmande data finns i Transformera data med Delta Live Tables.

Kontrollera hur tabeller materialiseras

Tabeller ger också ytterligare kontroll över materialiseringen:

Kommentar

För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att Delta Live Tables kan styra dataorganisationen. Om du inte förväntar dig att tabellen ska växa utöver en terabyte bör du vanligtvis inte ange partitionskolumner.

Exempel: Ange ett schema och partitionskolumner

Du kan också ange ett schema när du definierar en tabell. I följande exempel anges schemat för måltabellen, inklusive att använda Delta Lake-genererade kolumner och definiera partitionskolumner för tabellen:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Som standard härleder Delta Live Tables schemat från table definitionen om du inte anger något schema.

Exempel: Definiera tabellbegränsningar

Kommentar

Stöd för tabellbegränsningar finns i offentlig förhandsversion. För att definiera tabellbegränsningar måste din pipeline vara en Unity Catalog-aktiverad pipeline och konfigurerad för att använda preview kanalen.

När du anger ett schema kan du definiera primärnyckel och sekundärnycklar. Begränsningarna är informationsmässiga och tillämpas inte. I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Ange konfigurationsvärden för en tabell eller vy

Använd SET för att ange ett konfigurationsvärde för en tabell eller vy, inklusive Spark-konfigurationer. Alla tabeller eller vyer som du definierar i en notebook-fil efter att instruktionen SET har åtkomst till det definierade värdet. Alla Spark-konfigurationer som anges med instruktionen SET används när du kör Spark-frågan för en tabell eller vy som följer SET-instruktionen. Om du vill läsa ett konfigurationsvärde i en fråga använder du syntaxen för ${}stränginterpolation . I följande exempel anges ett Spark-konfigurationsvärde med namnet startDate och värdet används i en fråga:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Om du vill ange flera konfigurationsvärden använder du en separat SET instruktion för varje värde.

SQL-egenskaper

SKAPA TABELL ELLER VY
TEMPORARY

Skapa en tabell men publicera inte metadata för tabellen. TEMPORARY Satsen instruerar Delta Live Tables att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en tillfällig tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.
STREAMING

Skapa en tabell som läser en indatauppsättning som en dataström. Indatauppsättningen måste vara en strömmande datakälla, till exempel automatisk inläsning eller en STREAMING tabell.
PARTITIONED BY

En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen.
LOCATION

En valfri lagringsplats för tabelldata. Om det inte anges kommer systemet som standard att vara platsen för pipelinelagringen.
COMMENT

En valfri beskrivning för tabellen.
column_constraint

En valfri informations primärnyckel eller sekundärnyckelbegränsning för kolumnen.
table_constraint

En valfri informations primärnyckel eller sekundärnyckelbegränsning i tabellen.
TBLPROPERTIES

En valfri lista över tabellegenskaper för tabellen.
select_statement

En Delta Live Tables-fråga som definierar datauppsättningen för tabellen.
CONSTRAINT-sats
EXPECT expectation_name

Definiera villkor för expectation_namedatakvalitet . Om ON VIOLATION villkoret inte har definierats lägger du till rader som bryter mot villkoret för måldatauppsättningen.
ON VIOLATION

Valfri åtgärd att vidta för misslyckade rader:

* FAIL UPDATE: Stoppa omedelbart pipelinekörningen.
* DROP ROW: Släpp posten och fortsätt bearbetningen.

Ändra datainsamling med SQL i Delta Live Tables

Använd -instruktionen APPLY CHANGES INTO för att använda DELTA Live Tables CDC-funktioner enligt beskrivningen i följande:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Du definierar datakvalitetsbegränsningar för ett APPLY CHANGES mål med samma CONSTRAINT sats som icke-frågorAPPLY CHANGES . Se Hantera datakvalitet med Delta Live Tables.

Kommentar

Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen APPLY CHANGES måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by .

Se Förenklad insamling av ändringsdata med API:et APPLY CHANGES i Delta Live Tables.

Satser
KEYS

Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

Den här satsen krävs.
IGNORE NULL UPDATES

Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES har angetts behåller kolumner med en null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null.

Den här satsen är valfri.

Standardvärdet är att skriva över befintliga kolumner med null värden.
APPLY AS DELETE WHEN

Anger när en CDC-händelse ska behandlas som en DELETE i stället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskap.

Den här satsen är valfri.
APPLY AS TRUNCATE WHEN

Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

APPLY AS TRUNCATE WHEN Satsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkering.

Den här satsen är valfri.
SEQUENCE BY

Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Delta Live Tables använder den här sekvenseringen för att hantera ändringshändelser som kommer i fel ordning.

Den här satsen krävs.
COLUMNS

Anger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:

* Ange den fullständiga listan med kolumner som ska inkluderas: COLUMNS (userId, name, city).
* Ange en lista med kolumner som ska undantas: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen COLUMNS när satsen inte har angetts.
STORED AS

Om poster ska lagras som SCD-typ 1 eller SCD typ 2.

Den här satsen är valfri.

Standardvärdet är SCD typ 1.
TRACK HISTORY ON

Anger en delmängd av utdatakolumner för att generera historikposter när det finns ändringar i de angivna kolumnerna. Du kan antingen:

* Ange den fullständiga listan med kolumner som ska spåras: COLUMNS (userId, name, city).
* Ange en lista över kolumner som ska undantas från spårning: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri. Standardvärdet är spårningshistorik för alla utdatakolumner när det finns ändringar, motsvarande TRACK HISTORY ON *.