Partilhar via


Referência da linguagem SQL Delta Live Tables

Este artigo fornece detalhes para a interface de programação SQL do Delta Live Tables.

  • Para obter informações sobre a API do Python, consulte a referência da linguagem Python Delta Live Tables.
  • Para obter mais informações sobre comandos SQL, consulte Referência da linguagem SQL.

Você pode usar funções definidas pelo usuário (UDFs) do Python em suas consultas SQL, mas deve definir esses UDFs em arquivos Python antes de chamá-los em arquivos de origem SQL. Consulte Funções escalares definidas pelo usuário - Python.

Limitações

A PIVOT cláusula não é suportada. A pivot operação no Spark requer carregamento ansioso de dados de entrada para calcular o esquema da saída. Esse recurso não é suportado no Delta Live Tables.

Criar uma visualização materializada do Delta Live Tables ou uma tabela de streaming

Você usa a mesma sintaxe SQL básica ao declarar uma tabela de streaming ou uma exibição materializada (também conhecida como LIVE TABLE).

Você só pode declarar tabelas de streaming usando consultas que são lidas em relação a uma fonte de streaming. A Databricks recomenda o uso do Auto Loader para transmitir a ingestão de arquivos do armazenamento de objetos na nuvem. Consulte Sintaxe SQL do carregador automático.

Você deve incluir a função em torno de um nome de STREAM() conjunto de dados ao especificar outras tabelas ou exibições em seu pipeline como uma fonte de streaming.

A sintaxe a seguir descreve a sintaxe para declarar exibições materializadas e tabelas de streaming com SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Criar uma visualização Delta Live Tables

A seguir descrevemos a sintaxe para declarar modos de exibição com SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintaxe SQL do Auto Loader

A seguir descrevemos a sintaxe para trabalhar com o Auto Loader em SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Você pode usar as opções de formato suportadas com o Auto Loader. Usando a map() função, você pode passar qualquer número de opções para o cloud_files() método. Opções são pares chave-valor, onde as chaves e os valores são cadeias de caracteres. Para obter detalhes sobre formatos e opções de suporte, consulte Opções de formato de arquivo.

Exemplo: Definir tabelas

Você pode criar um conjunto de dados lendo a partir de uma fonte de dados externa ou de conjuntos de dados definidos em um pipeline. Para ler a partir de um conjunto de dados interno, anexe a LIVE palavra-chave ao nome do conjunto de dados. O exemplo a seguir define dois conjuntos de dados diferentes: uma tabela chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a taxi_raw tabela como entrada:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exemplo: Ler a partir de uma fonte de streaming

Para ler dados de uma fonte de streaming, por exemplo, Auto Loader ou um conjunto de dados interno, defina uma STREAMING tabela:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Para obter mais informações sobre streaming de dados, consulte Transformar dados com tabelas Delta Live.

Controlar como as tabelas são materializadas

As tabelas também oferecem um controlo adicional da sua materialização:

  • Especifique como as tabelas são particionadas usando PARTITIONED BYo . Você pode usar o particionamento para acelerar as consultas.
  • Você pode definir as propriedades da tabela usando TBLPROPERTIES. Consulte Propriedades da tabela Delta Live Tables.
  • Defina um local de armazenamento usando a LOCATION configuração. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline se LOCATION não estiverem definidos.
  • Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de partição.

Nota

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização dos dados. A menos que você espere que sua tabela cresça além de um terabyte, geralmente não deve especificar colunas de partição.

Exemplo: Especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema ao definir uma tabela. O exemplo a seguir especifica o esquema para a tabela de destino, incluindo o uso de colunas geradas pelo Delta Lake e a definição de colunas de partição para a tabela:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Por padrão, Delta Live Tables infere o esquema da table definição se você não especificar um esquema.

Exemplo: Definir restrições de tabela

Nota

O suporte do Delta Live Tables para restrições de tabela está em Visualização pública. Para definir restrições de tabela, seu pipeline deve ser um pipeline habilitado para Unity Catalog e configurado para usar o preview canal.

Ao especificar um esquema, você pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrizar valores usados ao declarar tabelas ou exibições com SQL

Use SET para especificar um valor de configuração em uma consulta que declara uma tabela ou exibição, incluindo configurações do Spark. Qualquer tabela ou exibição definida em um bloco de anotações após a SET instrução ter acesso ao valor definido. Todas as configurações do Spark especificadas usando a SET instrução são usadas ao executar a consulta Spark para qualquer tabela ou exibição após a instrução SET. Para ler um valor de configuração em uma consulta, use a sintaxe ${}de interpolação de cadeia de caracteres . O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar vários valores de configuração, use uma instrução separada SET para cada valor.

Propriedades SQL

CRIAR TABELA ou VISTA
TEMPORARY

Crie uma tabela, mas não publique metadados para a tabela. A TEMPORARY cláusula instrui o Delta Live Tables a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.
STREAMING

Crie uma tabela que leia um conjunto de dados de entrada como um fluxo. O conjunto de dados de entrada deve ser uma fonte de dados de streaming, por exemplo, Auto Loader ou uma STREAMING tabela.
PARTITIONED BY

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
LOCATION

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o sistema assumirá como padrão o local de armazenamento do pipeline.
COMMENT

Uma descrição opcional para a tabela.
column_constraint

Uma chave primária informativa opcional ou restrição de chave estrangeira na coluna.
table_constraint

Uma chave primária informativa opcional ou restrição de chave estrangeira na tabela.
TBLPROPERTIES

Uma lista opcional de propriedades da tabela para a tabela.
select_statement

Uma consulta Delta Live Tables que define o conjunto de dados para a tabela.
Cláusula CONSTRAINT
EXPECT expectation_name

Definir restrição expectation_namede qualidade de dados . Se ON VIOLATION a restrição não estiver definida, adicione linhas que violem a restrição ao conjunto de dados de destino.
ON VIOLATION

Ação opcional a ser executada para linhas com falha:

* FAIL UPDATE: Pare imediatamente a execução do pipeline.
* DROP ROW: Solte o registro e continue o processamento.

Alterar a captura de dados com SQL no Delta Live Tables

Use a instrução para usar a APPLY CHANGES INTO funcionalidade Delta Live Tables CDC, conforme descrito a seguir:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Você define restrições de qualidade de dados para um APPLY CHANGES destino usando a mesma CONSTRAINT cláusula que as não-consultasAPPLY CHANGES . Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

Nota

O comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o APPLY CHANGES esquema da tabela de destino, você também deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que o sequence_by campo.

Consulte APPLY CHANGES API: Simplifique a captura de dados de alteração no Delta Live Tables.

Cláusulas
KEYS

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Esta cláusula é obrigatória.
IGNORE NULL UPDATES

Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com a null manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null.

Esta cláusula é opcional.

O padrão é substituir colunas existentes por null valores.
APPLY AS DELETE WHEN

Especifica quando um evento CDC deve ser tratado como um DELETE upsert em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSecondspropriedade table.

Esta cláusula é opcional.
APPLY AS TRUNCATE WHEN

Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

A APPLY AS TRUNCATE WHEN cláusula é suportada apenas para SCD tipo 1. SCD tipo 2 não suporta truncate.

Esta cláusula é opcional.
SEQUENCE BY

O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem.

Esta cláusula é obrigatória.
COLUMNS

Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Pode:

* Especifique a lista completa de colunas a incluir: COLUMNS (userId, name, city).
* Especifique uma lista de colunas a excluir: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é opcional.

O padrão é incluir todas as colunas na tabela de destino quando a COLUMNS cláusula não é especificada.
STORED AS

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
TRACK HISTORY ON

Especifica um subconjunto de colunas de saída para gerar registros de histórico quando houver alterações nessas colunas especificadas. Pode:

* Especifique a lista completa de colunas a acompanhar: COLUMNS (userId, name, city).
* Especifique uma lista de colunas a serem excluídas do rastreamento: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é opcional. O padrão é o histórico de controle para todas as colunas de saída quando houver alterações, equivalente a TRACK HISTORY ON *.