Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O decorador @table pode ser usado para definir tabelas de streaming num pipeline.
Para definir uma tabela de streaming, aplique @table a uma consulta que execute uma leitura de streaming em relação a uma fonte de dados ou use a função create_streaming_table().
Observação
No módulo mais antigo dlt , o @table operador era usado para criar tabelas de streaming e visualizações materializadas. O operador @table no módulo pyspark.pipelines ainda funciona desta forma, mas a Databricks recomenda usar o operador @materialized_view para criar visualizações materializadas.
Sintaxe
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Parâmetros
@dp.expect() é uma cláusula opcional de expectativa das Lakeflow Spark Declarative Pipelines. Você pode incluir várias expectativas. Veja Expectativas.
| Parâmetro | Tipo | Description |
|---|---|---|
| função | function |
Required. Uma função que retorna um DataFrame de streaming Apache Spark de uma consulta definida pelo usuário. |
name |
str |
O nome da tabela. Se não for fornecido, o padrão será o nome da função. |
comment |
str |
Uma descrição para a tabela. |
spark_conf |
dict |
Uma lista de configurações do Spark para a execução desta consulta |
table_properties |
dict |
Um dict de propriedades da tabela para a tabela. |
path |
str |
Um local de armazenamento para dados de tabela. Se não estiver definido, use o local de armazenamento gerenciado para o esquema que contém a tabela. |
partition_cols |
list |
Uma lista de uma ou mais colunas a serem usadas para particionar a tabela. |
cluster_by_auto |
bool |
Habilite o agrupamento automático de líquidos na mesa. Isso pode ser combinado e cluster_by definir as colunas a serem usadas como chaves iniciais de clustering, seguidas por monitoramento e atualizações automáticas de seleção de chaves com base na carga de trabalho. Consulte Agrupamento automático de líquidos. |
cluster_by |
list |
Ative o agrupamento líquido na tabela e defina as colunas que serão utilizadas como chaves de agrupamento. Veja Utilizar clustering líquido para tabelas. |
schema |
str ou StructType |
Uma definição de esquema para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com Python StructType. |
private |
bool |
Crie uma tabela, mas não publique a tabela no metastore. Essa tabela está disponível para o gasoduto, mas não está acessível fora do gasoduto. As tabelas privadas persistem durante o ciclo de vida do pipeline. A predefinição é False.As tabelas privadas foram previamente criadas com o temporary parâmetro. |
row_filter |
str |
(Pré-visualização Pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
A especificação de um esquema é opcional e pode ser feita com PySpark StructType ou SQL DDL. Ao especificar um esquema, você pode, opcionalmente, incluir colunas geradas, máscaras de coluna e chaves primárias e estrangeiras. See:
- Colunas geradas pelo Lago Delta
- Restrições no Azure Databricks
- Publique tabelas com filtros de linha e máscaras de coluna.
Examples
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")