Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Декоратор @materialized_view можно использовать для определения материализованных представлений в конвейере.
Чтобы определить материализованное представление, примените @materialized_view к запросу, который выполняет пакетное чтение в источнике данных.
Синтаксис
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"},
table_properties = {"<key>" : "<value>", "<key>" : "<value>"},
path = "<storage-location-path>",
partition_cols = ["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema = "schema-definition",
refresh_policy = <policy>,
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Параметры
@dp.expect() является факультативной клаузой экспектации. Можно включить несколько ожиданий. См. ожидания.
| Параметр | Тип | Description |
|---|---|---|
| функция | function |
Обязательное. Функция, возвращающая пакетный кадр данных Apache Spark из определяемого пользователем запроса. |
name |
str |
Имя таблицы. Если этот параметр не указан, по умолчанию используется имя функции. |
comment |
str |
Описание таблицы. |
spark_conf |
dict |
Список конфигураций Spark для выполнения этого запроса |
table_properties |
dict |
Набор dictсвойств таблицы для таблицы. |
path |
str |
Расположение хранилища для данных таблицы. Если не задано, используйте управляемое расположение хранилища для схемы, содержащей таблицу. |
partition_cols |
list |
Список одного или нескольких столбцов, используемых для секционирования таблицы. |
cluster_by_auto |
bool |
Включите автоматическое кластеризация жидкости в таблице. Это можно объединить и cluster_by определить столбцы, которые следует использовать в качестве начальных ключей кластеризации, а затем мониторинг и автоматическое обновление выбора ключей на основе рабочей нагрузки. См. автоматическая кластеризация жидкости. |
cluster_by |
list |
Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. раздел "Использование кластеризации жидкости" для таблиц. |
schema |
str или StructType |
Определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType. |
private |
bool |
Создайте таблицу, но не публикуйте таблицу в хранилище метаданных. Эта таблица доступна конвейеру, но не может быть использована за его пределами. Частные таблицы сохраняются в течение времени существования конвейера. Значение по умолчанию — False. |
refresh_policy |
str |
(бета-версия) Строка, определяющая политику обновления для материализованного представления. Одно из следующих: auto, incrementalили incremental_strictfull. См. статью "Обновить политику" и REFRESH предложение POLICY (конвейеры).Значение по умолчанию — auto. |
row_filter |
str |
(общественная предварительная версия) Условие фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов. |
Указание схемы является необязательным и может выполняться с помощью PySpark StructType или SQL DDL. При указании схемы можно дополнительно включать созданные столбцы, маски столбцов и первичные и внешние ключи. See:
- Сгенерированные столбцы Delta Lake
- Ограничения в Azure Databricks
- Публикация таблиц с фильтрами строк и масками столбцов.
Примеры
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")
# Specify a refresh policy
@dp.materialized_view(
refresh_policy = 'incremental_strict'
)
def sales():
return ("...")