Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O Lakeflow Spark Declarative Pipelines (SDP) introduz várias novas palavras-chave e funções SQL para definir exibições materializadas e tabelas de streaming em pipelines. O suporte SQL para o desenvolvimento de pipelines baseia-se nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de Streaming Estruturado.
Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python suporta testes e operações mais abrangentes que são difíceis de implementar com SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.
Para obter uma referência completa da sintaxe SQL do pipeline, consulte Referência da linguagem SQL do pipeline.
Noções básicas de SQL para desenvolvimento de pipeline
O código SQL que cria conjuntos de dados de pipeline usa a sintaxe CREATE OR REFRESH para definir vistas materializadas e tabelas de streaming em relação aos resultados da consulta.
A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com semântica de streaming.
As operações de leitura e escrita usam por padrão o catálogo e o esquema especificados durante a configuração do pipeline. Consulte Defina o catálogo de destino e o esquema.
O código-fonte do pipeline difere criticamente dos scripts SQL: O SDP avalia todas as definições de conjunto de dados em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que qualquer consulta seja executada. A ordem das consultas que aparecem nos arquivos de origem define a ordem de avaliação do código, mas não a ordem de execução da consulta.
Criar uma vista materializada com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma exibição materializada com SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Criar uma tabela de streaming com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de streaming com SQL. Ao ler uma fonte para uma tabela de streaming, a STREAM palavra-chave indica usar a semântica de streaming para a fonte. Não use a STREAM palavra-chave ao criar uma visualização materializada:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Observação
Utilize a palavra-chave STREAM para aplicar a semântica de transmissão e ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler a partir de fontes estáticas ou apenas de anexação. Para ingerir dados que têm confirmações de alteração, podes usar Python e a opção SkipChangeCommits para manipular erros.
Carregar dados do armazenamento de objetos
Os pipelines suportam o carregamento de dados de todos os formatos suportados pelo Azure Databricks. Veja Opções de formato de dados.
Observação
Estes exemplos usam dados disponíveis no diretório /databricks-datasets, montados automaticamente no seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte Quais são os volumes do Catálogo Unity?.
O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Veja O que é o Auto Loader?.
SQL usa a função read_files para invocar a funcionalidade Auto Loader. Você também deve usar a palavra-chave STREAM para configurar uma leitura contínua com read_files.
O seguinte descreve a sintaxe para read_files em SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
As opções para Auto Loader são pares chave-valor. Para obter detalhes sobre os formatos e opções suportados, consulte Opções .
O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
A função read_files também suporta semântica em lote para criar visualizações materializadas. O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Valide dados com expectativas
Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerir a qualidade dos dados com as expectativas do fluxo de dados.
O código a seguir define uma expectativa chamada valid_data que descarta registros que são nulos durante a ingestão de dados:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Consultar visões materializadas e tabelas de streaming definidas no seu pipeline
O exemplo a seguir define quatro conjuntos de dados:
- Uma tabela de streaming chamada
ordersque carrega dados JSON. - Uma exibição materializada chamada
customersque carrega dados CSV. - Uma vista materializada chamada
customer_ordersque une registos dos conjuntos de dadosordersecustomers, converte o carimbo de data/hora da encomenda numa data e seleciona os camposcustomer_id,order_number,stateeorder_date. - Uma vista materializada denominada
daily_orders_by_stateque agrega o número diário de pedidos em cada estado.
Observação
Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são escritas e lidas do catálogo e esquema padrão configurados para seu pipeline.
O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe do esquema LIVE é silenciosamente ignorada. Ver LIVE schema (legacy).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Definir uma tabela privada
Você pode usar a PRIVATE cláusula ao criar uma exibição materializada ou uma tabela de streaming. Ao criar uma tabela privada, você cria a tabela, mas não cria os metadados para a tabela. A PRIVATE cláusula instrui o SDP a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela privada persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.
As tabelas privadas podem ter o mesmo nome que as tabelas no catálogo. Se você especificar um nome não qualificado para uma tabela dentro de um pipeline, se houver uma tabela privada e uma tabela de catálogo com esse nome, a tabela privada será usada.
As mesas privadas eram anteriormente referidas como mesas temporárias.
Apagar permanentemente registos de uma visualização materializada ou de uma tabela de streaming
Para excluir permanentemente registros de uma tabela de streaming com vetores de exclusão habilitados, como para conformidade com o GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.
As exibições materializadas sempre refletem os dados nas tabelas subjacentes quando são atualizadas. Para excluir dados em uma exibição Materializada, você deve excluir os dados da fonte e atualizar a exibição materializada.
Parametrizar valores usados ao declarar tabelas ou exibições com SQL
Use SET para especificar um valor de configuração em uma consulta que declara uma tabela ou exibição, incluindo configurações do Spark. Qualquer tabela ou vista que defina num ficheiro de origem depois da declaração SET terá acesso ao valor definido. Todas as configurações do Spark especificadas usando a instrução SET são usadas ao executar a consulta Spark para qualquer tabela ou exibição após a instrução SET. Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de cadeia de caracteres ${}. O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.
Limitações
A cláusula PIVOT não é suportada. A operação pivot no Spark requer o carregamento ansioso de dados de entrada para calcular o esquema de saída. Essa funcionalidade não é suportada em pipelines.
Observação
A sintaxe CREATE OR REFRESH LIVE TABLE para criar uma exibição materializada foi preterida. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW.