@materialized_viewデコレーターを使用して、パイプラインで具体化されたビューを定義できます。
具体化されたビューを定義するには、データ ソースに対してバッチ読み取りを実行するクエリに @materialized_view を適用します。
構文
from pyspark import pipelines as dp
@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
パラメーター
@dp.expect() は省略可能な期待句です。 複数の期待値を含めることができます。
「期待値」を参照してください。
| パラメーター | タイプ | Description |
|---|---|---|
| 機能 | function |
必須。 ユーザー定義クエリから Apache Spark バッチ DataFrame を返す関数。 |
name |
str |
テーブル名。 指定しない場合は、既定で関数名が使用されます。 |
comment |
str |
テーブルの説明です。 |
spark_conf |
dict |
このクエリを実行するための Spark 構成の一覧 |
table_properties |
dict |
dictのテーブル プロパティ群。 |
path |
str |
テーブル データの格納場所。 設定されていない場合は、テーブルを含むスキーマのマネージド ストレージの場所を使用します。 |
partition_cols |
list |
テーブルのパーティション分割に使用する 1 つ以上の列の一覧。 |
cluster_by_auto |
bool |
テーブルで自動液体クラスタリングを有効にします。 これを cluster_by と組み合わせて、初期クラスタリング キーとして使用する列を定義してから、ワークロードに基づく監視と自動キー選択の更新を行うことができます。
自動液体クラスタリングを参照してください。 |
cluster_by |
list |
リキッド クラスタリングをテーブルに対して有効化し、クラスタリング キーとして使用する列を定義します。 表に液体クラスタリングを使用するを参照してください。 |
schema |
str または StructType |
テーブルのスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python StructType を使用して定義できます。 |
private |
bool |
テーブルを作成しますが、メタストアにテーブルを発行しないでください。 そのテーブルはパイプラインで使用できますが、パイプラインの外部からはアクセスできません。 プライベート テーブルは、パイプラインの有効期間中保持されます。 既定値は Falseです。 |
row_filter |
str |
(パブリック プレビュー) テーブル用の行フィルター句。 「行フィルターと列マスクを使用してテーブルを発行する」を参照してください。 |
スキーマの指定は省略可能であり、PySpark StructType または SQL DDL で行うことができます。 スキーマを指定する場合は、必要に応じて、生成された列、列マスク、および主キーと外部キーを含めることができます。 参照:
例示
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")