다음을 통해 공유


구체화된 뷰

데코레이터를 @materialized_view 사용하여 파이프라인에서 구체화된 뷰를 정의할 수 있습니다.

구체화된 뷰를 정의하려면 데이터 원본에 대해 일괄 읽기를 수행하는 쿼리에 적용 @materialized_view 합니다.

Syntax

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

매개 변수

@dp.expect() 는 선택적 기대 조건 절입니다. 여러 기대치를 포함할 수 있습니다. 기대치를 참조하세요.

매개 변수 유형 Description
기능 function 필수 사항입니다. 사용자 정의 쿼리에서 Apache Spark 일괄 처리 DataFrame을 반환하는 함수입니다.
name str 테이블의 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다.
comment str 테이블에 대한 설명입니다.
spark_conf dict 이 쿼리 실행을 위한 Spark 구성 목록
table_properties dict dict 테이블의 테이블 속성입니다.
path str 테이블 데이터의 스토리지 위치입니다. 설정하지 않은 경우 테이블을 포함하는 스키마에 대해 관리되는 스토리지 위치를 사용합니다.
partition_cols list 테이블을 분할하는 데 사용할 하나 이상의 열 목록입니다.
cluster_by_auto bool 테이블에서 자동 액체 클러스터링을 사용하도록 설정합니다. 이를 초기 클러스터링 키로 cluster_by 사용할 열과 결합하고 정의한 다음 워크로드에 따라 모니터링 및 자동 키 선택 업데이트를 수행할 수 있습니다. 자동 액체 클러스터링을 참조하세요.
cluster_by list 테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. 테이블에 대한 액체 클러스터링 사용을 참조하세요.
schema str 또는 StructType 테이블에 대한 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python StructType으로 정의할 수 있습니다
private bool 테이블을 만들지만 메타스토어에 테이블을 게시하지 마세요. 해당 테이블은 파이프라인에서 사용할 수 있지만 파이프라인 외부에서는 액세스할 수 없습니다. 프라이빗 테이블은 파이프라인의 수명 동안 유지됩니다.
기본값은 False입니다.
row_filter str (공개 체험판) 테이블에 대한 행 필터 조건입니다. 행 필터 및 열 마스크가 있는 테이블 게시을 참조하세요.

스키마 지정은 선택 사항이며 PySpark StructType 또는 SQL DDL을 사용하여 수행할 수 있습니다. 스키마를 지정할 때 생성된 열, 열 마스크, 기본 및 외장 키를 선택적으로 포함할 수 있습니다. See:

예시

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")