Udostępnij za pomocą


zmaterializowany widok

Dekorator @materialized_view może służyć do definiowania zmaterializowanych widoków w potoku.

Aby zdefiniować zmaterializowany widok, zastosuj do @materialized_view zapytania, które wykonuje odczyty wsadowe ze źródła danych.

Składnia

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parametry

@dp.expect() jest opcjonalną klauzulą oczekiwania. Możesz uwzględnić wiele oczekiwań. Zobacz Oczekiwania.

Parameter Typ Description
funkcja function To jest wymagane. Funkcja zwracająca wsadową ramkę danych platformy Apache Spark z zapytania zdefiniowanego przez użytkownika.
name str Nazwa tabeli Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji.
comment str Opis tabeli.
spark_conf dict Lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania
table_properties dict A dictwłaściwości tabeli dla tabeli.
path str Lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, użyj zarządzanej lokalizacji magazynu dla schematu zawierającego tabelę.
partition_cols list Lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
cluster_by_auto bool Włącz automatyczne klastrowanie cieczy w tabeli. Można to połączyć z cluster_by i definiować kolumny, które będą używane jako początkowe klucze klastrowania, a następnie monitorować oraz automatycznie aktualizować wybór kluczy na podstawie obciążenia pracą. Zobacz Automatyczne klastrowanie cieczy.
cluster_by list Włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Używaj płynnego grupowania dla tabel.
schema str lub StructType Definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType.
private bool Utwórz tabelę, ale nie publikuj tabeli w magazynie metadanych. Ta tabela jest dostępna dla kanału, ale nie jest dostępna poza kanałem. Tabele prywatne są przechowywane przez cały okres działania potoku.
Wartość domyślna to False.
row_filter str (Publiczna wersja zapoznawcza) Klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Określanie schematu jest opcjonalne i można to zrobić za pomocą narzędzia PySpark StructType lub SQL DDL. Po określeniu schematu można opcjonalnie uwzględnić wygenerowane kolumny, maski kolumn oraz klucze podstawowe i obce. Zobacz:

Przykłady

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")