Condividi tramite


table

Il decoratore @table può essere usato per definire le tabelle di streaming in una pipeline.

Per definire una tabella di streaming, applicare @table a una query che esegue una lettura in streaming da un'origine dati o usare la funzione create_streaming_table().

Annotazioni

Nel modulo precedente dlt l'operatore @table è stato usato per creare tabelle di streaming e viste materializzate. L'operatore @table nel pyspark.pipelines modulo funziona ancora in questo modo, ma Databricks consiglia di usare l'operatore @materialized_view per creare viste materializzate.

Sintassi

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parametri

@dp.expect() è una clausola di aspettativa facoltativa delle pipeline dichiarative di Lakeflow Spark. È possibile includere più aspettative. Vedere Aspettative.

Parametro TIPO Description
funzione function Obbligatorio. Funzione che restituisce un dataframe di streaming Apache Spark da una query definita dall'utente.
name str Nome della tabella. Se non specificato, per impostazione predefinita viene impostato il nome della funzione.
comment str Descrizione della tabella.
spark_conf dict Elenco delle configurazioni di Spark per l'esecuzione di questa query
table_properties dict Un dict di proprietà della tabella relativo alla tabella.
path str Posizione di archiviazione per i dati della tabella. Se non è impostato, usare il percorso di archiviazione gestito per lo schema contenente la tabella.
partition_cols list Elenco di una o più colonne da utilizzare per il partizionamento della tabella.
cluster_by_auto bool Abilitare il clustering liquido automatico nella tabella. Può essere combinato con cluster_by e definire le colonne da usare come chiavi di clustering iniziali, seguite dal monitoraggio e dagli aggiornamenti automatici della selezione delle chiavi in base al carico di lavoro. Per ulteriori informazioni, vedere Clustering liquido automatico.
cluster_by list Abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering. Vedere Usare clustering liquido per le tabelle.
schema str o StructType Definizione dello schema per la tabella. Gli schemi possono essere definiti come una stringa SQL DDL o con il linguaggio Python StructType.
private bool Creare una tabella, ma non pubblicare la tabella nel metastore. Tale tabella è disponibile per la pipeline, ma non è accessibile all'esterno della pipeline. Le tabelle private vengono mantenute per la durata della pipeline.
Il valore predefinito è False.
Le tabelle private sono state create in precedenza con il temporary parametro .
row_filter str (Anteprima pubblica) Clausola di filtro di riga per la tabella. Vedere Pubblicare tabelle con filtri di riga e maschere di colonna.

Specificare uno schema è facoltativo e può essere eseguito con PySpark StructType o SQL DDL. Quando si specifica uno schema, è possibile includere facoltativamente colonne generate, maschere di colonna e chiavi primarie ed esterne. See:

Esempi

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")