Aracılığıyla paylaş


table

Bu @table dekoratör, işlem hattında akış tabloları tanımlamak için kullanılabilir.

Akış tablosu tanımlamak için, veri kaynağında akış okuması gerçekleştiren bir sorguya @table uygulayın veya create_streaming_table() işlevini kullanın.

Uyarı

Eski dlt modülde @table işleç hem akış tabloları hem de gerçekleştirilmiş görünümler oluşturmak için kullanılırdı. @table Modüldeki pyspark.pipelines işleç bu şekilde çalışmaya devam eder, ancak Databricks malzeme görünümleri oluşturmak için @materialized_view işlecini kullanmanızı önerir.

Sözdizimi

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = <bool>,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = <bool>)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parametreler

@dp.expect() isteğe bağlı bir Lakeflow Spark Deklaratif İşlem Hatları beklenti cümleciğidir. Birden çok beklenti ekleyebilirsiniz. Bkz. Beklentiler.

Parametre Türü Description
fonksiyon function Gerekli. Kullanıcı tanımlı bir sorgudan Apache Spark akış DataFrame'i döndüren işlev.
name str Tablo adı. Sağlanmadıysa, varsayılan olarak işlev adını kullanır.
comment str Tablo için bir açıklama.
spark_conf dict Bu sorgunun yürütülmesi için Spark yapılandırmalarının listesi
table_properties dict dict Tablonun tablo özelliklerinden biri.
path str Tablo verileri için bir depolama konumu. Ayarlanmadıysa, tabloyu içeren şema için yönetilen depolama konumunu kullanın.
partition_cols list Tabloyu bölümlendirmek için kullanılacak bir veya daha fazla sütunun listesi.
cluster_by_auto bool Tabloda otomatik sıvı kümelemasını etkinleştirin. Bu, ilk kümeleme anahtarları olarak kullanılacak sütunlarla cluster_by birleştirilebilir ve tanımlanabilir, ardından iş yüküne göre izleme ve otomatik anahtar seçimi güncelleştirmeleri gerçekleştirilir. Bkz. Otomatik sıvı kümeleme.
cluster_by list Tabloda sıvı kümelemeye olanak tanıyın ve kümeleme anahtarları olarak kullanılacak sütunları tanımlayın. Bkz Tablolar için sıvı kümeleme kullanma.
schema str veya StructType Tablo için şema tanımı. Şemalar bir SQL DDL dizesi olarak veya Python StructTypeile tanımlanabilir.
private bool Bir tablo oluşturun, ancak tabloyu meta veri deposunda yayımlamayın. Bu tablo işlem hattına açıktır ancak işlem hattı dışından erişilemez. Özel tablolar işlem hattının ömrü boyunca kalır.
Varsayılan değer: False.
Özel tablolar daha önce parametresiyle temporary oluşturulmuştur.
row_filter str (Genel Önizleme) Tablo için satır filtresi yan tümcesi. Bkz. Satır filtreleri ve sütun maskeleriyle tabloları yayımlama.

Şema belirtmek isteğe bağlıdır ve PySpark StructType veya SQL DDL ile yapılabilir. Bir şema belirttiğinizde, isteğe bağlı olarak oluşturulan sütunları, sütun maskelerini ve birincil ve yabancı anahtarları ekleyebilirsiniz. See:

Örnekler

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")