다음을 통해 공유


table

데코레이터를 @table 사용하여 파이프라인에서 스트리밍 테이블을 정의할 수 있습니다.

스트리밍 테이블을 정의하려면 데이터 원본에 대해 스트리밍 읽기를 수행하는 쿼리에 @table을 적용하거나 create_streaming_table() 함수를 사용합니다.

비고

이전 dlt 모듈에서 연산자는 @table 스트리밍 테이블과 구체화된 뷰를 모두 만드는 데 사용되었습니다. 모듈의 @table 연산자는 pyspark.pipelines 여전히 이런 방식으로 작동하지만 Databricks는 연산자를 @materialized_view 사용하여 구체화된 뷰를 만드는 것이 좋습니다.

Syntax

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

매개 변수

@dp.expect() 는 Lakeflow Spark의 선언적 파이프라인에서 사용되는 선택적인 예상 절입니다. 여러 기대치를 포함할 수 있습니다. 기대치를 참조하세요.

매개 변수 유형 Description
기능 function 필수 사항입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다.
name str 테이블의 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다.
comment str 테이블에 대한 설명입니다.
spark_conf dict 이 쿼리 실행을 위한 Spark 구성 목록
table_properties dict dict 테이블의 테이블 속성입니다.
path str 테이블 데이터의 스토리지 위치입니다. 설정하지 않은 경우 테이블을 포함하는 스키마에 대해 관리되는 스토리지 위치를 사용합니다.
partition_cols list 테이블을 분할하는 데 사용할 하나 이상의 열 목록입니다.
cluster_by_auto bool 테이블에서 자동 액체 클러스터링을 사용하도록 설정합니다. 이를 초기 클러스터링 키로 cluster_by 사용할 열과 결합하고 정의한 다음 워크로드에 따라 모니터링 및 자동 키 선택 업데이트를 수행할 수 있습니다. 자동 액체 클러스터링을 참조하세요.
cluster_by list 테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. 테이블에 대한 액체 클러스터링 사용을 참조하세요.
schema str 또는 StructType 테이블에 대한 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python StructType으로 정의할 수 있습니다
private bool 테이블을 만들지만 메타스토어에 테이블을 게시하지 마세요. 해당 테이블은 파이프라인에서 사용할 수 있지만 파이프라인 외부에서는 액세스할 수 없습니다. 프라이빗 테이블은 파이프라인의 수명 동안 유지됩니다.
기본값은 False입니다.
프라이빗 테이블은 이전에 매개 변수를 사용하여 temporary 만들어졌습니다.
row_filter str (공개 체험판) 테이블에 대한 행 필터 조건입니다. 행 필터 및 열 마스크가 있는 테이블 게시을 참조하세요.

스키마 지정은 선택 사항이며 PySpark StructType 또는 SQL DDL을 사용하여 수행할 수 있습니다. 스키마를 지정할 때 생성된 열, 열 마스크, 기본 및 외장 키를 선택적으로 포함할 수 있습니다. See:

예시

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")