Bagikan melalui


tampilan terwujud

Dekorator @materialized_view dapat digunakan untuk menentukan tampilan materialisasi dalam alur pemrosesan.

Untuk menentukan tampilan materialisasi, terapkan @materialized_view ke kueri yang melakukan pembacaan batch terhadap sumber data.

Syntax

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parameter-parameternya

@dp.expect() adalah klausa ekspektasi opsional. Anda dapat menyertakan beberapa ekspektasi. Lihat Ekspektasi.

Pengaturan Tipe Description
fungsi function Dibutuhkan. Fungsi yang mengembalikan DataFrame batch Apache Spark dari kueri yang didefinisikan pengguna.
name str Nama tabel. Jika tidak disediakan, akan otomatis menggunakan nama fungsi.
comment str Deskripsi untuk tabel.
spark_conf dict Daftar konfigurasi Spark untuk eksekusi kueri ini
table_properties dict Sebuah properti dicttabel untuk tabel.
path str Lokasi penyimpanan untuk data tabel. Jika tidak diatur, gunakan lokasi penyimpanan terkelola untuk skema yang berisi tabel.
partition_cols list Daftar satu atau beberapa kolom yang akan digunakan untuk mempartisi tabel.
cluster_by_auto bool Aktifkan pengklusteran cairan otomatis pada tabel. Ini dapat dikombinasikan dengan cluster_by dan menentukan kolom yang akan digunakan sebagai kunci pengklusteran awal, diikuti dengan pemantauan dan pembaruan pemilihan kunci otomatis berdasarkan beban kerja. Lihat pengklusteran cairan otomatis.
cluster_by list Aktifkan pengklusteran cair pada tabel dan tentukan kolom yang akan digunakan sebagai kunci pengklusteran. Lihat Menggunakan pengklusteran cair untuk tabel.
schema str atau StructType Definisi skema untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL atau dengan Python StructType.
private bool Buat tabel, tetapi jangan terbitkan tabel ke metastore. Tabel tersebut tersedia untuk jalur pemrosesan data tetapi tidak dapat diakses di luar jalur tersebut. Tabel privat akan ada selama masa pakai pipelain data.
Defaultnya adalah False.
row_filter str (Pratinjau Umum) Klausa filter baris untuk tabel. Lihat Menerbitkan tabel dengan filter baris dan masker kolom.

Menentukan skema bersifat opsional dan dapat dilakukan dengan PySpark StructType atau SQL DDL. Saat Anda menentukan skema, Anda dapat secara opsional menyertakan kolom yang dihasilkan, masker kolom, dan kunci primer dan asing. See:

Examples

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")