Поделиться через


материализованное представление

Декоратор @materialized_view можно использовать для определения материализованных представлений в конвейере.

Чтобы определить материализованное представление, примените @materialized_view к запросу, который выполняет пакетное чтение в источнике данных.

Синтаксис

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Параметры

@dp.expect() является факультативной клаузой экспектации. Можно включить несколько ожиданий. См. ожидания.

Параметр Тип Description
функция function Обязательное. Функция, возвращающая пакетный кадр данных Apache Spark из определяемого пользователем запроса.
name str Имя таблицы. Если этот параметр не указан, по умолчанию используется имя функции.
comment str Описание таблицы.
spark_conf dict Список конфигураций Spark для выполнения этого запроса
table_properties dict Набор dictсвойств таблицы для таблицы.
path str Расположение хранилища для данных таблицы. Если не задано, используйте управляемое расположение хранилища для схемы, содержащей таблицу.
partition_cols list Список одного или нескольких столбцов, используемых для секционирования таблицы.
cluster_by_auto bool Включите автоматическое кластеризация жидкости в таблице. Это можно объединить и cluster_by определить столбцы, которые следует использовать в качестве начальных ключей кластеризации, а затем мониторинг и автоматическое обновление выбора ключей на основе рабочей нагрузки. См. автоматическая кластеризация жидкости.
cluster_by list Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. раздел "Использование кластеризации жидкости" для таблиц.
schema str или StructType Определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType.
private bool Создайте таблицу, но не публикуйте таблицу в хранилище метаданных. Эта таблица доступна конвейеру, но не может быть использована за его пределами. Частные таблицы сохраняются в течение времени существования конвейера.
Значение по умолчанию — False.
row_filter str (общественная предварительная версия) Условие фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов.

Указание схемы является необязательным и может выполняться с помощью PySpark StructType или SQL DDL. При указании схемы можно дополнительно включать созданные столбцы, маски столбцов и первичные и внешние ключи. See:

Примеры

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")