Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dekorator @materialized_view może służyć do definiowania zmaterializowanych widoków w potoku.
Aby zdefiniować zmaterializowany widok, zastosuj do @materialized_view zapytania, które wykonuje odczyty wsadowe ze źródła danych.
Składnia
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Parametry
@dp.expect() jest opcjonalną klauzulą oczekiwania. Możesz uwzględnić wiele oczekiwań. Zobacz Oczekiwania.
| Parameter | Typ | Description |
|---|---|---|
| funkcja | function |
To jest wymagane. Funkcja zwracająca wsadową ramkę danych platformy Apache Spark z zapytania zdefiniowanego przez użytkownika. |
name |
str |
Nazwa tabeli Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji. |
comment |
str |
Opis tabeli. |
spark_conf |
dict |
Lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania |
table_properties |
dict |
A dictwłaściwości tabeli dla tabeli. |
path |
str |
Lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, użyj zarządzanej lokalizacji magazynu dla schematu zawierającego tabelę. |
partition_cols |
list |
Lista co najmniej jednej kolumny używanej do partycjonowania tabeli. |
cluster_by_auto |
bool |
Włącz automatyczne klastrowanie cieczy w tabeli. Można to połączyć z cluster_by i definiować kolumny, które będą używane jako początkowe klucze klastrowania, a następnie monitorować oraz automatycznie aktualizować wybór kluczy na podstawie obciążenia pracą. Zobacz Automatyczne klastrowanie cieczy. |
cluster_by |
list |
Włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Używaj płynnego grupowania dla tabel. |
schema |
str lub StructType |
Definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType. |
private |
bool |
Utwórz tabelę, ale nie publikuj tabeli w magazynie metadanych. Ta tabela jest dostępna dla kanału, ale nie jest dostępna poza kanałem. Tabele prywatne są przechowywane przez cały okres działania potoku. Wartość domyślna to False. |
row_filter |
str |
(Publiczna wersja zapoznawcza) Klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn. |
Określanie schematu jest opcjonalne i można to zrobić za pomocą narzędzia PySpark StructType lub SQL DDL. Po określeniu schematu można opcjonalnie uwzględnić wygenerowane kolumny, maski kolumn oraz klucze podstawowe i obce. Zobacz:
- kolumny generowane przez Delta Lake
- Ograniczenia usługi Azure Databricks
- Publikowanie tabel z filtrami wierszy i maskami kolumn.
Przykłady
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")