Compartilhar via


tabela

O @table decorador pode ser usado para definir tabelas de streaming em um pipeline.

Para definir uma tabela de streaming, aplique @table a uma consulta que execute uma leitura de streaming em uma fonte de dados ou use a função create_streaming_table().

Observação

No módulo mais antigo dlt, o operador @table era usado para criar tabelas de streaming e visões materializadas. O @table operador no pyspark.pipelines módulo ainda funciona dessa forma, mas o Databricks recomenda usar o @materialized_view operador para criar exibições materializadas.

Sintaxe

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parâmetros

@dp.expect() é uma cláusula opcional de expectativa do Lakeflow Spark Declarative Pipelines. Você pode incluir várias expectativas. Veja as expectativas.

Parâmetro Tipo Description
função function Obrigatório Uma função que retorna um DataFrame de streaming do Apache Spark de uma consulta definida pelo usuário.
name str O nome da tabela. Se não for fornecido, o padrão será o nome da função.
comment str Uma descrição da tabela.
spark_conf dict Uma lista de configurações do Spark para a execução dessa consulta
table_properties dict Um dict das propriedades da tabela para a tabela.
path str Um local de armazenamento para dados de tabela. Se não estiver definido, use o local de armazenamento gerenciado para o esquema que contém a tabela.
partition_cols list Uma lista de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by_auto bool Habilite o agrupamento automático de líquidos na tabela. Isso pode ser combinado com cluster_by e definir as colunas a serem utilizadas como claves de agrupamento iniciais, seguidas pelo monitoramento e atualizações automáticas de seleção de chaves com base na carga de trabalho. Consulte clusterização automática de líquidos.
cluster_by list Habilite o agrupamento líquido na tabela e defina as colunas a serem usadas como chaves de agrupamento. Consulte Usar clustering líquido para tabelas.
schema str ou StructType Uma definição de esquema para a tabela. Os esquemas podem ser definidos como uma string DDL do SQL ou com um script Python StructType.
private bool Crie uma tabela, mas não publique a tabela no metastore. Essa tabela está disponível para o pipeline, mas não pode ser acessada fora do pipeline. As tabelas privadas persistem durante o tempo de vida do pipeline.
O padrão é False.
Tabelas privadas foram criadas anteriormente com o temporary parâmetro.
row_filter str (Versão prévia pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar as tabelas com os filtros de linha e as máscaras de coluna.

Especificar um esquema é opcional e pode ser feito com o PySpark StructType ou o DDL do SQL. Ao especificar um esquema, opcionalmente, você pode incluir colunas geradas, máscaras de coluna e chaves primárias e estrangeiras. Consulte:

Exemplos

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")