Udostępnij za pomocą


tabela

Dekorator @table może służyć do definiowania tabel strumieniowych w potoku.

Aby zdefiniować tabelę przesyłania strumieniowego, zastosuj @table do zapytania, które wykonuje odczyt strumieniowy względem źródła danych lub użyj funkcji create_streaming_table().

Uwaga / Notatka

W starszym dlt module @table operator był używany do tworzenia strumieniowych tabel i materializowanych widoków. Operator @table w module pyspark.pipelines wciąż działa w ten sposób, ale usługa Databricks zaleca korzystanie z operatora @materialized_view do tworzenia zmaterializowanych widoków.

Składnia

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parametry

@dp.expect() jest opcjonalną klauzulą oczekiwania potoków deklaratywnych platformy Spark w usłudze Lakeflow. Możesz uwzględnić wiele oczekiwań. Zobacz Oczekiwania.

Parameter Typ Description
funkcja function To jest wymagane. Funkcja, która zwraca strumieniowy DataFrame w Apache Spark na podstawie zapytania zdefiniowanego przez użytkownika.
name str Nazwa tabeli Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji.
comment str Opis tabeli.
spark_conf dict Lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania
table_properties dict A dictwłaściwości tabeli dla tabeli.
path str Lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, użyj zarządzanej lokalizacji magazynu dla schematu zawierającego tabelę.
partition_cols list Lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
cluster_by_auto bool Włącz automatyczne klastrowanie cieczy w tabeli. Można to połączyć z cluster_by i definiować kolumny, które będą używane jako początkowe klucze klastrowania, a następnie monitorować oraz automatycznie aktualizować wybór kluczy na podstawie obciążenia pracą. Zobacz Automatyczne klastrowanie cieczy.
cluster_by list Włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Używaj płynnego grupowania dla tabel.
schema str lub StructType Definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType.
private bool Utwórz tabelę, ale nie publikuj tabeli w magazynie metadanych. Ta tabela jest dostępna dla kanału, ale nie jest dostępna poza kanałem. Tabele prywatne są przechowywane przez cały okres działania potoku.
Wartość domyślna to False.
Tabele prywatne zostały wcześniej utworzone za pomocą parametru temporary .
row_filter str (Publiczna wersja zapoznawcza) Klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Określanie schematu jest opcjonalne i można to zrobić za pomocą narzędzia PySpark StructType lub SQL DDL. Po określeniu schematu można opcjonalnie uwzględnić wygenerowane kolumny, maski kolumn oraz klucze podstawowe i obce. Zobacz:

Przykłady

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")