次の方法で共有


テーブル

@tableデコレーターを使用して、パイプライン内のストリーミング テーブルを定義できます。

ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに @table を適用するか、create_streaming_table() 関数を使用します。

以前の dlt モジュールでは、 @table 演算子を使用して、ストリーミング テーブルと具体化されたビューの両方を作成しました。 @table モジュールの pyspark.pipelines 演算子は引き続きこの方法で動作しますが、Databricks では、@materialized_view演算子を使用して具体化されたビューを作成することをお勧めします。

構文

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

パラメーター

@dp.expect() は、オプションの Lakeflow Spark 宣言パイプライン期待句です。 複数の期待値を含めることができます。 「期待値」を参照してください。

パラメーター タイプ Description
機能 function 必須。 ユーザー定義クエリから Apache Spark ストリーミング DataFrame を返す関数。
name str テーブル名。 指定しない場合は、既定で関数名が使用されます。
comment str テーブルの説明です。
spark_conf dict このクエリを実行するための Spark 構成の一覧
table_properties dict dictテーブル プロパティ群
path str テーブル データの格納場所。 設定されていない場合は、テーブルを含むスキーマのマネージド ストレージの場所を使用します。
partition_cols list テーブルのパーティション分割に使用する 1 つ以上の列の一覧。
cluster_by_auto bool テーブルで自動液体クラスタリングを有効にします。 これを cluster_by と組み合わせて、初期クラスタリング キーとして使用する列を定義してから、ワークロードに基づく監視と自動キー選択の更新を行うことができます。 自動液体クラスタリングを参照してください。
cluster_by list リキッド クラスタリングをテーブルに対して有効化し、クラスタリング キーとして使用する列を定義します。 表に液体クラスタリングを使用するを参照してください。
schema str または StructType テーブルのスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python StructType を使用して定義できます。
private bool テーブルを作成しますが、メタストアにテーブルを発行しないでください。 そのテーブルはパイプラインで使用できますが、パイプラインの外部からはアクセスできません。 プライベート テーブルは、パイプラインの有効期間中保持されます。
既定値は Falseです。
プライベート テーブルは、以前に temporary パラメーターを使用して作成されました。
row_filter str (パブリック プレビュー) テーブル用の行フィルター句。 「行フィルターと列マスクを使用してテーブルを発行する」を参照してください。

スキーマの指定は省略可能であり、PySpark StructType または SQL DDL で行うことができます。 スキーマを指定する場合は、必要に応じて、生成された列、列マスク、および主キーと外部キーを含めることができます。 参照:

例示

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")