Поделиться через


таблица

Декоратор @table можно использовать для определения как материализованных представлений, так и потоковых таблиц. В Python декларативные конвейеры Lakeflow определяют, следует ли обновлять набор данных в виде материализованного представления или потоковой таблицы на основе определяющего запроса.

Чтобы определить материализованное представление в Python, примените @table к запросу, который выполняет статическое чтение к источнику данных. Чтобы определить потоковую таблицу, примените @table к запросу, который выполняет потоковое чтение из источника данных или используйте функцию create_streaming_table().

Синтаксис

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect(...)
def <function-name>():
    return (<query>)

Параметры

@dlt.expect() является необязательным условием ожидания в Декларативных конвейерах Lakeflow. Можно включить несколько ожиданий. См. ожидания.

Параметр Тип Описание
функция function Обязательное. Функция, которая возвращает кадр данных Apache Spark или потоковый кадр данных из определяемого пользователем запроса.
name str Имя таблицы. Если этот параметр не указан, по умолчанию используется имя функции.
comment str Описание таблицы.
spark_conf dict Список конфигураций Spark для выполнения этого запроса
table_properties dict Набор dictсвойств таблицы для таблицы.
path str Расположение хранилища для данных таблицы. Если не задано, используйте управляемое расположение хранилища для схемы, содержащей таблицу.
partition_cols list Список одного или нескольких столбцов, используемых для секционирования таблицы.
cluster_by list Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. раздел "Использование кластеризации жидкости" для таблиц.
schema str или StructType Определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType.
temporary bool Создайте таблицу, но не публикуйте таблицу в хранилище метаданных. Эта таблица доступна конвейеру, но не может быть использована за его пределами. Временные таблицы сохраняются в течение времени существования конвейера.
Значение по умолчанию — False.
row_filter str (общественная предварительная версия) Условие фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов.

Указание схемы является необязательным и может выполняться с помощью PySpark StructType или SQL DDL. При указании схемы можно дополнительно включать созданные столбцы, маски столбцов и первичные и внешние ключи. См.

Примеры

import dlt

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")