Compartilhar via


visão materializada

O @materialized_view decorador pode ser usado para definir exibições materializadas em um pipeline.

Para definir uma exibição materializada, aplique-se @materialized_view a uma consulta que executa uma leitura em lote em uma fonte de dados.

Sintaxe

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parâmetros

@dp.expect() é uma cláusula de expectativa opcional. Você pode incluir várias expectativas. Veja as expectativas.

Parâmetro Tipo Description
função function Obrigatório Uma função que retorna um DataFrame em lote do Apache Spark a partir de uma consulta definida pelo usuário.
name str O nome da tabela. Se não for fornecido, o padrão será o nome da função.
comment str Uma descrição da tabela.
spark_conf dict Uma lista de configurações do Spark para a execução dessa consulta
table_properties dict Um dict das propriedades da tabela para a tabela.
path str Um local de armazenamento para dados de tabela. Se não estiver definido, use o local de armazenamento gerenciado para o esquema que contém a tabela.
partition_cols list Uma lista de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by_auto bool Habilite o agrupamento automático de líquidos na tabela. Isso pode ser combinado com cluster_by e definir as colunas a serem utilizadas como claves de agrupamento iniciais, seguidas pelo monitoramento e atualizações automáticas de seleção de chaves com base na carga de trabalho. Consulte clusterização automática de líquidos.
cluster_by list Habilite o agrupamento líquido na tabela e defina as colunas a serem usadas como chaves de agrupamento. Consulte Usar clustering líquido para tabelas.
schema str ou StructType Uma definição de esquema para a tabela. Os esquemas podem ser definidos como uma string DDL do SQL ou com um script Python StructType.
private bool Crie uma tabela, mas não publique a tabela no metastore. Essa tabela está disponível para o pipeline, mas não pode ser acessada fora do pipeline. As tabelas privadas persistem durante o tempo de vida do pipeline.
O padrão é False.
row_filter str (Versão prévia pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar as tabelas com os filtros de linha e as máscaras de coluna.

Especificar um esquema é opcional e pode ser feito com o PySpark StructType ou o DDL do SQL. Ao especificar um esquema, opcionalmente, você pode incluir colunas geradas, máscaras de coluna e chaves primárias e estrangeiras. Consulte:

Exemplos

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")