Delen via


Sql-taalreferentie voor Delta Live Tables

In dit artikel vindt u meer informatie over de SQL-programmeerinterface van Delta Live Tables.

U kunt door de gebruiker gedefinieerde Python-functies (UDF's) in uw SQL-query's gebruiken, maar u moet deze UDF's definiëren in Python-bestanden voordat u ze aanroept in SQL-bronbestanden. Zie door de gebruiker gedefinieerde scalaire functies - Python.

Beperkingen

De PIVOT component wordt niet ondersteund. De pivot bewerking in Spark vereist het gretige laden van invoergegevens om het uitvoerschema te berekenen. Deze mogelijkheid wordt niet ondersteund in Delta Live Tables.

Een gerealiseerde weergave of streamingtabel voor Delta Live-tabellen maken

Notitie

  • De CREATE OR REFRESH LIVE TABLE syntaxis voor het maken van een gerealiseerde weergave is afgeschaft. Gebruik in plaats daarvan CREATE OR REFRESH MATERIALIZED VIEW.
  • Als u de CLUSTER BY component wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

U gebruikt dezelfde eenvoudige SQL-syntaxis bij het declareren van een streamingtabel of een gerealiseerde weergave.

Een gerealiseerde weergave delta livetabellen declareren met SQL

Hier volgt een beschrijving van de syntaxis voor het declareren van een gerealiseerde weergave in Delta Live Tables met SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Een streamingtabel voor Delta Live Tables declareren met SQL

U kunt alleen streamingtabellen declareren met behulp van query's die worden gelezen op basis van een streamingbron. Databricks raadt het gebruik van automatische laadprogramma's aan voor het streamen van bestanden uit de opslag van cloudobjecten. Zie de SQL-syntaxis van het automatisch laden.

Wanneer u andere tabellen of weergaven in uw pijplijn opgeeft als streamingbronnen, moet u de STREAM() functie rond een naam van een gegevensset opnemen.

Hieronder wordt de syntaxis beschreven voor het declareren van een streamingtabel in Delta Live Tables met SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Een Delta Live Tables-weergave maken

Hier volgt een beschrijving van de syntaxis voor het declareren van weergaven met SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-syntaxis voor automatisch laden

Hier volgt een beschrijving van de syntaxis voor het werken met automatisch laden in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

U kunt ondersteunde indelingsopties gebruiken met autolaadprogramma's. Met behulp van de map() functie kunt u opties doorgeven aan de read_files() methode. Opties zijn sleutel-waardeparen, waarbij de sleutels en waarden tekenreeksen zijn. Zie De bestandsindelingsopties voor meer informatie over ondersteuningsindelingen en opties.

Voorbeeld: Tabellen definiëren

U kunt een gegevensset maken door gegevens te lezen uit een externe gegevensbron of uit gegevenssets die zijn gedefinieerd in een pijplijn. Als u een interne gegevensset wilt lezen, moet u het LIVE trefwoord vooraf laten gaan aan de naam van de gegevensset. In het volgende voorbeeld worden twee verschillende gegevenssets gedefinieerd: een tabel taxi_raw die een JSON-bestand als invoerbron gebruikt en een tabel filtered_data die de taxi_raw tabel als invoer gebruikt:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Voorbeeld: Lezen uit een streamingbron

Als u gegevens wilt lezen uit een streamingbron, bijvoorbeeld Automatisch laden of een interne gegevensset, definieert u een STREAMING tabel:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Zie Gegevens transformeren met Delta Live Tables voor meer informatie over het streamen van gegevens.

Bepalen hoe tabellen worden gerealiseerd

Tabellen bieden ook extra controle over hun materialisatie:

Notitie

Voor tabellen met een grootte van minder dan 1 TB raadt Databricks aan om delta livetabellen de gegevensorganisatie te laten beheren. Tenzij u verwacht dat uw tabel groter wordt dan een terabyte, raadt Databricks aan dat u geen partitiekolommen opgeeft.

Voorbeeld: Een schema en partitiekolommen opgeven

U kunt desgewenst een schema opgeven wanneer u een tabel definieert. In het volgende voorbeeld wordt het schema voor de doeltabel opgegeven, waaronder het gebruik van door Delta Lake gegenereerde kolommen en het definiëren van partitiekolommen voor de tabel:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Delta Live Tables geeft standaard het schema af van de table definitie als u geen schema opgeeft.

Voorbeeld: Tabelbeperkingen definiëren

Notitie

Ondersteuning voor Delta Live Tables voor tabelbeperkingen bevindt zich in openbare preview. Als u tabelbeperkingen wilt definiëren, moet uw pijplijn een Unity Catalog-pijplijn zijn en zijn geconfigureerd voor het gebruik van het preview kanaal.

Wanneer u een schema opgeeft, kunt u primaire en refererende sleutels definiëren. De beperkingen zijn informatief en worden niet afgedwongen. Zie de CONSTRAINT-component in de sql-taalreferentie.

In het volgende voorbeeld wordt een tabel met een primaire en refererende-sleutelbeperking gedefinieerd:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Waarden parameteriseren die worden gebruikt bij het declareren van tabellen of weergaven met SQL

Hiermee SET geeft u een configuratiewaarde op in een query die een tabel of weergave declareert, inclusief Spark-configuraties. Elke tabel of weergave die u in een notitieblok definieert nadat de SET instructie toegang heeft tot de gedefinieerde waarde. Spark-configuraties die zijn opgegeven met behulp van de SET instructie, worden gebruikt bij het uitvoeren van de Spark-query voor een tabel of weergave na de SET-instructie. Als u een configuratiewaarde in een query wilt lezen, gebruikt u de syntaxis ${}van de tekenreeksinterpolatie. In het volgende voorbeeld wordt een Spark-configuratiewaarde met de naam ingesteld startDate en wordt die waarde in een query gebruikt:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Als u meerdere configuratiewaarden wilt opgeven, gebruikt u een afzonderlijke SET instructie voor elke waarde.

Voorbeeld: Een rijfilter en kolommasker definiëren

Belangrijk

Rijfilters en kolommaskers bevinden zich in openbare preview.

Als u een gerealiseerde weergave of streamingtabel met een rijfilter en kolommasker wilt maken, gebruikt u de component ROW FILTER en de MASK-component. In het volgende voorbeeld ziet u hoe u een gerealiseerde weergave en een streamingtabel definieert met zowel een rijfilter als een kolommasker:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Zie Tabellen publiceren met rijfilters en kolommaskers voor meer informatie over rijfilters en kolommaskers.

SQL-eigenschappen

Notitie

Als u de CLUSTER BY component wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

TABEL OF WEERGAVE MAKEN
TEMPORARY

Maak een tabel, maar publiceer geen metagegevens voor de tabel. Met TEMPORARY de component worden Delta Live Tables geïnstrueerd om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, blijft een tijdelijke tabel behouden gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.
STREAMING

Maak een tabel die een invoergegevensset leest als een stroom. De invoergegevensset moet een streaminggegevensbron zijn, bijvoorbeeld Automatisch laden of een STREAMING tabel.
CLUSTER BY

Schakel vloeistofclustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clustersleutels.

Zie Liquid clustering gebruiken voor Delta-tabellen.
PARTITIONED BY

Een optionele lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel.
LOCATION

Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn.
COMMENT

Een optionele beschrijving voor de tabel.
column_constraint

Een optionele primaire sleutel of refererende sleutelbeperking voor de kolom.
MASK clause (Openbare preview)

Voegt een kolommaskerfunctie toe om gevoelige gegevens anoniem te maken. Toekomstige query's voor die kolom retourneren het resultaat van de geëvalueerde functie in plaats van de oorspronkelijke waarde van de kolom. Dit is handig voor fijnmazig toegangsbeheer, omdat de functie de identiteit en groepslidmaatschappen van de gebruiker kan controleren om te bepalen of de waarde moet worden bewerkt.

Zie de component Kolommasker.
table_constraint

Een optionele primaire sleutel of refererende sleutelbeperking voor de tabel.
TBLPROPERTIES

Een optionele lijst met tabeleigenschappen voor de tabel.
WITH ROW FILTER clause (Openbare preview)

Hiermee voegt u een rijfilterfunctie toe aan de tabel. Toekomstige query's voor die tabel ontvangen een subset van de rijen waarvoor de functie WAAR oplevert. Dit is handig voor gedetailleerd toegangsbeheer, omdat hiermee de functie de identiteit en groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of bepaalde rijen moeten worden gefilterd.

Zie de COMPONENT ROW FILTER.
select_statement

Een Delta Live Tables-query waarmee de gegevensset voor de tabel wordt gedefinieerd.
CONSTRAINT-component
EXPECT expectation_name

Beperking voor gegevenskwaliteit expectation_namedefiniëren. Als de ON VIOLATION beperking niet is gedefinieerd, voegt u rijen toe die de beperking aan de doelgegevensset schenden.
ON VIOLATION

Optionele actie die moet worden uitgevoerd voor mislukte rijen:

- FAIL UPDATE: Stop de uitvoering van pijplijnen onmiddellijk.
- DROP ROW: Verwijder de record en ga verder met verwerken.

Gegevens vastleggen wijzigen met SQL in Delta Live Tables

Gebruik de instructie voor het APPLY CHANGES INTO gebruik van CDC-functionaliteit voor Delta Live Tables, zoals beschreven in het volgende:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

U definieert beperkingen voor gegevenskwaliteit voor een APPLY CHANGES doel met dezelfde CONSTRAINT component als niet-query'sAPPLY CHANGES . Zie Gegevenskwaliteit beheren met Delta Live Tables.

Notitie

Het standaardgedrag voor INSERT en UPDATE gebeurtenissen is om CDC-gebeurtenissen uit de bron te upsert : werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutel(en) of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN voorwaarde.

Belangrijk

U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de APPLY CHANGES doeltabel opgeeft, moet u ook de __START_AT en __END_AT kolommen met hetzelfde gegevenstype als het sequence_by veld opnemen.

Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.

Clausules
KEYS

De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.

Als u een combinatie van kolommen wilt definiëren, gebruikt u een door komma's gescheiden lijst met kolommen.

Deze component is vereist.
IGNORE NULL UPDATES

Toestaan dat updates worden opgenomen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en IGNORE NULL UPDATES is opgegeven, behouden kolommen met een null kolom de bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met een waarde van null.

Deze component is optioneel.

De standaardinstelling is om bestaande kolommen met null waarden te overschrijven.
APPLY AS DELETE WHEN

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE upsert in plaats van een upsert. Als u verouderde gegevens wilt verwerken, wordt de verwijderde rij tijdelijk bewaard als tombstone in de onderliggende Delta-tabel en wordt er een weergave gemaakt in de metastore die deze tombstones filtert. Het bewaarinterval kan worden geconfigureerd met de
pipelines.cdc.tombstoneGCThresholdInSecondstabeleigenschap.

Deze component is optioneel.
APPLY AS TRUNCATE WHEN

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE. Omdat deze component een volledig afkappen van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases waarvoor deze functionaliteit is vereist.

De APPLY AS TRUNCATE WHEN component wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor de afkappende bewerking.

Deze component is optioneel.
SEQUENCE BY

De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. Delta Live Tables maakt gebruik van deze sequentiëring om wijzigingsgebeurtenissen af te handelen die niet op volgorde aankomen.

De opgegeven kolom moet een sorteerbaar gegevenstype zijn.

Deze component is vereist.
COLUMNS

Hiermee geeft u een subset van kolommen op die moeten worden opgenomen in de doeltabel. U hebt de volgende mogelijkheden:

- Geef de volledige lijst met kolommen op die moeten worden opgenomen: COLUMNS (userId, name, city).
- Geef een lijst met kolommen op die moeten worden uitgesloten: COLUMNS * EXCEPT (operation, sequenceNum)

Deze component is optioneel.

De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer de COLUMNS component niet is opgegeven.
STORED AS

Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.

Deze component is optioneel.

De standaardwaarde is SCD type 1.
TRACK HISTORY ON

Hiermee geeft u een subset van uitvoerkolommen op voor het genereren van geschiedenisrecords wanneer er wijzigingen zijn in die opgegeven kolommen. U hebt de volgende mogelijkheden:

- Geef de volledige lijst met kolommen op die moeten worden bijgehouden: COLUMNS (userId, name, city).
- Geef een lijst op met kolommen die moeten worden uitgesloten van het bijhouden: COLUMNS * EXCEPT (operation, sequenceNum)

Deze component is optioneel. De standaardinstelling is het bijhouden van de geschiedenis voor alle uitvoerkolommen wanneer er wijzigingen zijn, gelijk aan TRACK HISTORY ON *.