CRIAR TABELA DE STREAMING
Aplica-se a: Databricks SQL
Databricks Runtime 13.3 LTS e superior
Importante
Esta funcionalidade está em Pré-visualização Pública.
Cria uma tabela de streaming, uma tabela Delta com suporte extra para streaming ou processamento incremental de dados.
As tabelas de streaming só são suportadas no Delta Live Tables e no Databricks SQL com Unity Catalog. A execução deste comando no Databricks Runtime compute suportado analisa apenas a sintaxe. Consulte Implementar um pipeline Delta Live Tables com SQL.
Sintaxe
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] |
WITH { ROW FILTER clause } } [...]
Parâmetros
REFRESH
Se especificado, atualiza a tabela com os dados mais recentes disponíveis das fontes definidas na consulta. Apenas os novos dados que chegam antes do início da consulta são processados. Novos dados que são adicionados às fontes durante a execução do comando são ignorados até a próxima atualização.
SE NÃO EXISTIR
Se especificado e já existir uma tabela com o mesmo nome, a instrução será ignorada.
IF NOT EXISTS
não pode ser usado em conjunto comREFRESH
, o que significaCREATE OR REFRESH TABLE IF NOT EXISTS
que não é permitido.-
O nome da tabela a ser criada. O nome não deve incluir uma especificação temporal. Se o nome não estiver qualificado, a tabela será criada no esquema atual.
table_specification
Esta cláusula opcional define a lista de colunas, seus tipos, propriedades, descrições e restrições de coluna.
Se você não definir colunas no esquema da tabela, deverá especificar
AS query
.-
Um nome exclusivo para a coluna.
-
Especifica o tipo de dados da coluna.
NÃO NULO
Se especificado, a coluna não aceita
NULL
valores.COMENTAR column_comment
Um literal de cadeia de caracteres para descrever a coluna.
-
Importante
Esta funcionalidade está em Pré-visualização Pública.
Adiciona uma restrição de chave primária ou chave estrangeira à coluna em uma tabela de streaming. Não há suporte para restrições para tabelas no
hive_metastore
catálogo. -
Importante
Esta funcionalidade está em Pré-visualização Pública.
Adiciona uma função de máscara de coluna para anonimizar dados confidenciais. Todas as consultas futuras dessa coluna receberão o resultado da avaliação dessa função sobre a coluna no lugar do valor original da coluna. Isso pode ser útil para fins de controle de acesso refinado, em que a função pode inspecionar a identidade e/ou associações de grupo do usuário que invoca para decidir se deseja redigir o valor.
RESTRIÇÃO expectation_name ESPERAR (expectation_expr) [ ON VIOLATION { FAIL UPDATE | LINHA DE QUEDA } ]
Adiciona expectativas de qualidade de dados à tabela. Essas expectativas de qualidade de dados podem ser acompanhadas ao longo do tempo e acessadas por meio do log de eventos da tabela de streaming. Uma
FAIL UPDATE
expectativa faz com que o processamento falhe ao criar a tabela, bem como ao atualizá-la. UmaDROP ROW
expectativa faz com que toda a linha seja abandonada se a expectativa não for atendida.expectation_expr
pode ser composto por literais, identificadores de coluna dentro da tabela e determinísticos, funções ou operadores SQL internos, exceto:- Funções agregadas
- Funções de janela analítica
- Funções da janela de classificação
- Funções do gerador com valor de tabela
Também
expr
não deve conter nenhuma subconsulta.- Funções agregadas
-
Importante
Esta funcionalidade está em Pré-visualização Pública.
Adiciona uma chave primária informativa ou restrições de chave estrangeira informativa a uma tabela de streaming. Não há suporte para restrições de chave para tabelas no
hive_metastore
catálogo.
-
-
table_clauses
Opcionalmente, especifique particionamento, comentários, propriedades definidas pelo usuário e uma agenda de atualização para a nova tabela. Cada subcláusula só pode ser especificada uma vez.
-
Uma lista opcional de colunas da tabela para particionar a tabela por.
COMENTAR table_comment
Um
STRING
literal para descrever a tabela.-
Opcionalmente, define uma ou mais propriedades definidas pelo usuário.
SCHEDULE [ REFRESH ] CRON cron_string [ NO FUSO HORÁRIO timezone_id ]
Se fornecido, agenda a tabela de streaming ou a visualização materializada para atualizar seus dados com a programação de quartzo cron dada. Apenas time_zone_values são aceites.
AT TIME ZONE LOCAL
não é suportado. SeAT TIME ZONE
estiver ausente, o fuso horário da sessão será usado. SeAT TIME ZONE
estiver ausente e o fuso horário da sessão não estiver definido, um erro será lançado.SCHEDULE
é semanticamente equivalente aSCHEDULE REFRESH
.Não é possível usar a
SCHEDULE
sintaxe em uma definição de pipeline Delta Live Tables.A
SCHEDULE
cláusula não é permitida em umCREATE OR REFRESH
comando. A programação pode ser fornecida como parte doCREATE
comando. Use ALTER STREAMING TABLE para alterar a programação de uma tabela de streaming após a criação.Cláusula COM FILTRO DE LINHA
Importante
Esta funcionalidade está em Pré-visualização Pública.
Adiciona uma função de filtro de linha à tabela. Todas as consultas futuras dessa tabela receberão um subconjunto de suas linhas para as quais a função é avaliada como booleana TRUE. Isso pode ser útil para fins de controle de acesso refinado, em que a função pode inspecionar a identidade e/ou associações de grupo do usuário que invoca para decidir se deseja filtrar determinadas linhas.
-
-
Esta cláusula preenche a tabela usando os dados de
query
. Essa consulta deve ser uma consulta de streaming . Isso pode ser conseguido adicionando aSTREAM
palavra-chave a qualquer relação que você queira processar incrementalmente. Quando você especifica umquery
e umtable_specification
juntos, o esquema de tabela especificado emtable_specification
deve conter todas as colunas retornadas peloquery
, caso contrário, você receberá um erro. Todas as colunas especificadas,table_specification
mas não retornadas porquery
valores de retornonull
quando consultadas.Esta cláusula é necessária para tabelas de streaming criadas no Databricks SQL, mas não é necessária no Delta Live Tables. Se essa cláusula não for fornecida no Delta Live Tables, você deverá fazer referência a essa tabela em um
APPLY CHANGES
comando no pipeline de DLT. Consulte Alterar captura de dados com SQL em Delta Live Tables.
Diferenças entre tabelas de streaming e outras tabelas
As tabelas de streaming são tabelas com monitoração de estado, projetadas para lidar com cada linha apenas uma vez enquanto você processa um conjunto de dados crescente. Como a maioria dos conjuntos de dados cresce continuamente ao longo do tempo, as tabelas de streaming são boas para a maioria das cargas de trabalho de ingestão. As tabelas de streaming são ideais para pipelines que exigem atualização de dados e baixa latência. As tabelas de streaming também podem ser úteis para transformações em grande escala, já que os resultados podem ser calculados incrementalmente à medida que novos dados chegam, mantendo os resultados atualizados sem a necessidade de recalcular totalmente todos os dados de origem a cada atualização. As tabelas de streaming são projetadas para fontes de dados que são somente acréscimo.
As tabelas de streaming aceitam comandos adicionais, como REFRESH
, que processa os dados mais recentes disponíveis nas fontes fornecidas na consulta. As alterações na consulta fornecida só são refletidas em novos dados chamando um REFRESH
, dados não processados anteriormente. Para aplicar as alterações nos dados existentes também, você precisa executar REFRESH TABLE <table_name> FULL
para executar um FULL REFRESH
arquivo . As atualizações completas reprocessam todos os dados disponíveis na fonte com a definição mais recente. Não é recomendável chamar atualizações completas em fontes que não mantêm todo o histórico dos dados ou têm períodos de retenção curtos, como Kafka, pois a atualização completa trunca os dados existentes. Talvez não seja possível recuperar dados antigos se os dados não estiverem mais disponíveis na fonte.
Filtros de linha e máscaras de coluna
Importante
Esta funcionalidade está em Pré-visualização Pública.
Os filtros de linha permitem especificar uma função que se aplica como um filtro sempre que uma verificação de tabela busca linhas. Esses filtros garantem que as consultas subsequentes retornem apenas linhas para as quais o predicado do filtro seja avaliado como true.
As máscaras de coluna permitem mascarar os valores de uma coluna sempre que uma verificação de tabela busca linhas. Todas as consultas futuras envolvendo essa coluna receberão o resultado da avaliação da função sobre a coluna, substituindo o valor original da coluna.
Para obter mais informações sobre como usar filtros de linha e máscaras de coluna, consulte Filtrar dados de tabela confidenciais usando filtros de linha e máscaras de coluna.
Gerenciando filtros de linha e máscaras de coluna
Filtros de linha e máscaras de coluna em tabelas de streaming devem ser adicionados, atualizados ou descartados através da CREATE OR REFRESH
instrução.
Comportamento
- Atualizar como Definer: quando as instruções or
REFRESH
atualizam uma tabela de streaming, asCREATE OR REFRESH
funções de filtro de linha são executadas com os direitos do definer (como proprietário da tabela). Isso significa que a atualização da tabela usa o contexto de segurança do usuário que criou a tabela de streaming. - Consulta: Embora a maioria dos filtros seja executada com os direitos do definidor, as funções que verificam o contexto do usuário (como
CURRENT_USER
eIS_MEMBER
) são exceções. Essas funções são executadas como o invocador. Essa abordagem impõe segurança de dados específicos do usuário e controles de acesso com base no contexto do usuário atual.
Observabilidade
Use DESCRIBE EXTENDED
, INFORMATION_SCHEMA
ou o Gerenciador de Catálogos para examinar os filtros de linha e as máscaras de coluna existentes que se aplicam a uma determinada tabela de streaming. Essa funcionalidade permite que os usuários auditem e revisem o acesso a dados e as medidas de proteção em tabelas de streaming.
Limitações
Apenas os proprietários de tabelas podem atualizar as tabelas de streaming para obter os dados mais recentes.
ALTER TABLE
Os comandos não são permitidos em tabelas de streaming. A definição e as propriedades da tabela devem ser alteradas através daCREATE OR REFRESH
instrução orALTER STREAMING TABLE
.Não há suporte para consultas de viagem no tempo.
Não há suporte para a evolução do esquema de tabela por meio de comandos DML como
INSERT INTO
, eMERGE
não há suporte.Os seguintes comandos não são suportados em tabelas de streaming:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
O compartilhamento delta não é suportado.
Não há suporte para renomear a tabela ou alterar o proprietário.
Restrições de tabela, como
PRIMARY KEY
eFOREIGN KEY
não são suportadas.Não há suporte para colunas geradas, colunas de identidade e colunas padrão.
Exemplos
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')
Artigos relacionados
Comentários
https://aka.ms/ContentUserFeedback.
Brevemente: Ao longo de 2024, vamos descontinuar progressivamente o GitHub Issues como mecanismo de feedback para conteúdos e substituí-lo por um novo sistema de feedback. Para obter mais informações, veja:Submeter e ver comentários