Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Декоратор @materialized_view можно использовать для определения материализованных представлений в конвейере.
Чтобы определить материализованное представление, примените @materialized_view к запросу, который выполняет пакетное чтение в источнике данных.
Синтаксис
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Параметры
@dp.expect() является факультативной клаузой экспектации. Можно включить несколько ожиданий. См. ожидания.
| Параметр | Тип | Description |
|---|---|---|
| функция | function |
Обязательное. Функция, возвращающая пакетный кадр данных Apache Spark из определяемого пользователем запроса. |
name |
str |
Имя таблицы. Если этот параметр не указан, по умолчанию используется имя функции. |
comment |
str |
Описание таблицы. |
spark_conf |
dict |
Список конфигураций Spark для выполнения этого запроса |
table_properties |
dict |
Набор dictсвойств таблицы для таблицы. |
path |
str |
Расположение хранилища для данных таблицы. Если не задано, используйте управляемое расположение хранилища для схемы, содержащей таблицу. |
partition_cols |
list |
Список одного или нескольких столбцов, используемых для секционирования таблицы. |
cluster_by_auto |
bool |
Включите автоматическое кластеризация жидкости в таблице. Это можно объединить и cluster_by определить столбцы, которые следует использовать в качестве начальных ключей кластеризации, а затем мониторинг и автоматическое обновление выбора ключей на основе рабочей нагрузки. См. автоматическая кластеризация жидкости. |
cluster_by |
list |
Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. раздел "Использование кластеризации жидкости" для таблиц. |
schema |
str или StructType |
Определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType. |
private |
bool |
Создайте таблицу, но не публикуйте таблицу в хранилище метаданных. Эта таблица доступна конвейеру, но не может быть использована за его пределами. Частные таблицы сохраняются в течение времени существования конвейера. Значение по умолчанию — False. |
row_filter |
str |
(общественная предварительная версия) Условие фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов. |
Указание схемы является необязательным и может выполняться с помощью PySpark StructType или SQL DDL. При указании схемы можно дополнительно включать созданные столбцы, маски столбцов и первичные и внешние ключи. See:
- Сгенерированные столбцы Delta Lake
- Ограничения в Azure Databricks
- Публикация таблиц с фильтрами строк и масками столбцов.
Примеры
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")