Partilhar via


tabela

O decorador @table pode ser usado para definir tabelas de streaming num pipeline.

Para definir uma tabela de streaming, aplique @table a uma consulta que execute uma leitura de streaming em relação a uma fonte de dados ou use a função create_streaming_table().

Observação

No módulo mais antigo dlt , o @table operador era usado para criar tabelas de streaming e visualizações materializadas. O operador @table no módulo pyspark.pipelines ainda funciona desta forma, mas a Databricks recomenda usar o operador @materialized_view para criar visualizações materializadas.

Sintaxe

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parâmetros

@dp.expect() é uma cláusula opcional de expectativa das Lakeflow Spark Declarative Pipelines. Você pode incluir várias expectativas. Veja Expectativas.

Parâmetro Tipo Description
função function Required. Uma função que retorna um DataFrame de streaming Apache Spark de uma consulta definida pelo usuário.
name str O nome da tabela. Se não for fornecido, o padrão será o nome da função.
comment str Uma descrição para a tabela.
spark_conf dict Uma lista de configurações do Spark para a execução desta consulta
table_properties dict Um dict de propriedades da tabela para a tabela.
path str Um local de armazenamento para dados de tabela. Se não estiver definido, use o local de armazenamento gerenciado para o esquema que contém a tabela.
partition_cols list Uma lista de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by_auto bool Habilite o agrupamento automático de líquidos na mesa. Isso pode ser combinado e cluster_by definir as colunas a serem usadas como chaves iniciais de clustering, seguidas por monitoramento e atualizações automáticas de seleção de chaves com base na carga de trabalho. Consulte Agrupamento automático de líquidos.
cluster_by list Ative o agrupamento líquido na tabela e defina as colunas que serão utilizadas como chaves de agrupamento. Veja Utilizar clustering líquido para tabelas.
schema str ou StructType Uma definição de esquema para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com Python StructType.
private bool Crie uma tabela, mas não publique a tabela no metastore. Essa tabela está disponível para o gasoduto, mas não está acessível fora do gasoduto. As tabelas privadas persistem durante o ciclo de vida do pipeline.
A predefinição é False.
As tabelas privadas foram previamente criadas com o temporary parâmetro.
row_filter str (Pré-visualização Pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

A especificação de um esquema é opcional e pode ser feita com PySpark StructType ou SQL DDL. Ao especificar um esquema, você pode, opcionalmente, incluir colunas geradas, máscaras de coluna e chaves primárias e estrangeiras. See:

Examples

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")