데코레이터를 @table 사용하여 파이프라인에서 스트리밍 테이블을 정의할 수 있습니다.
스트리밍 테이블을 정의하려면 데이터 원본에 대해 스트리밍 읽기를 수행하는 쿼리에 @table을 적용하거나 create_streaming_table() 함수를 사용합니다.
비고
이전 dlt 모듈에서 연산자는 @table 스트리밍 테이블과 구체화된 뷰를 모두 만드는 데 사용되었습니다. 모듈의 @table 연산자는 pyspark.pipelines 여전히 이런 방식으로 작동하지만 Databricks는 연산자를 @materialized_view 사용하여 구체화된 뷰를 만드는 것이 좋습니다.
Syntax
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
매개 변수
@dp.expect() 는 Lakeflow Spark의 선언적 파이프라인에서 사용되는 선택적인 예상 절입니다. 여러 기대치를 포함할 수 있습니다.
기대치를 참조하세요.
| 매개 변수 | 유형 | Description |
|---|---|---|
| 기능 | function |
필수 사항입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다. |
name |
str |
테이블의 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다. |
comment |
str |
테이블에 대한 설명입니다. |
spark_conf |
dict |
이 쿼리 실행을 위한 Spark 구성 목록 |
table_properties |
dict |
dict 테이블의 테이블 속성입니다. |
path |
str |
테이블 데이터의 스토리지 위치입니다. 설정하지 않은 경우 테이블을 포함하는 스키마에 대해 관리되는 스토리지 위치를 사용합니다. |
partition_cols |
list |
테이블을 분할하는 데 사용할 하나 이상의 열 목록입니다. |
cluster_by_auto |
bool |
테이블에서 자동 액체 클러스터링을 사용하도록 설정합니다. 이를 초기 클러스터링 키로 cluster_by 사용할 열과 결합하고 정의한 다음 워크로드에 따라 모니터링 및 자동 키 선택 업데이트를 수행할 수 있습니다.
자동 액체 클러스터링을 참조하세요. |
cluster_by |
list |
테이블에서 액체 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. 테이블에 대한 액체 클러스터링 사용을 참조하세요. |
schema |
str 또는 StructType |
테이블에 대한 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python StructType으로 정의할 수 있습니다 |
private |
bool |
테이블을 만들지만 메타스토어에 테이블을 게시하지 마세요. 해당 테이블은 파이프라인에서 사용할 수 있지만 파이프라인 외부에서는 액세스할 수 없습니다. 프라이빗 테이블은 파이프라인의 수명 동안 유지됩니다. 기본값은 False입니다.프라이빗 테이블은 이전에 매개 변수를 사용하여 temporary 만들어졌습니다. |
row_filter |
str |
(공개 체험판) 테이블에 대한 행 필터 조건입니다. 행 필터 및 열 마스크가 있는 테이블 게시을 참조하세요. |
스키마 지정은 선택 사항이며 PySpark StructType 또는 SQL DDL을 사용하여 수행할 수 있습니다. 스키마를 지정할 때 생성된 열, 열 마스크, 기본 및 외장 키를 선택적으로 포함할 수 있습니다. See:
예시
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")