Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A @table dekorátor használatával egy pipeline-ban streamelési táblák definiálhatók.
Streamelési tábla definiálásához alkalmazzon @table olyan lekérdezésre, amely streamelési olvasást végez egy adatforráson, vagy használja a create_streaming_table() függvényt.
Megjegyzés:
A régebbi dlt modulban az operátort @table a streamelési táblák és a materializált nézetek létrehozására használták. A @table modul operátora pyspark.pipelines továbbra is így működik, de a Databricks azt javasolja, hogy az @materialized_view operátor használatával hozzon létre materializált nézeteket.
Szemantika
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Paraméterek
@dp.expect() egy választható Lakeflow Spark deklaratív folyamat várakozási záradék. Több elvárást is megadhat. Lásd az elvárásokat.
| Paraméter | Típus | Description |
|---|---|---|
| funkció | function |
Szükséges. Egy függvény, amely egy Apache Spark streamelési DataFrame-et ad vissza egy felhasználó által megadott lekérdezésből. |
name |
str |
A tábla neve. Ha nincs megadva, alapértelmezés szerint a függvény neve lesz. |
comment |
str |
A táblázat leírása. |
spark_conf |
dict |
A lekérdezés végrehajtásához szükséges Spark-konfigurációk listája |
table_properties |
dict |
A dicttáblázati tulajdonságok halmaza a táblázathoz. |
path |
str |
A táblaadatok tárolási helye. Ha nincs beállítva, használja a táblát tartalmazó séma felügyelt tárolási helyét. |
partition_cols |
list |
A tábla particionálásához használandó egy vagy több oszlop listája. |
cluster_by_auto |
bool |
Automatikus folyadékklaszterezés engedélyezése a táblázatban. Ez kombinálható a cluster_by elemmel, és meghatározhatja az oszlopokat, amelyeket kezdeti fürtözési kulcsként kíván használni, majd a számítási terhelés alapján történő monitorozással és automatikus kulcskijelölési frissítésekkel követhető. Lásd: Automatikus folyadékfürtözés. |
cluster_by |
list |
Engedélyezze a "liquid clustering" funkciót a táblában, és határozza meg a fürtözési kulcsként használni kívánt oszlopokat. Lásd: Táblákhoz folyékony klaszterezés használata. |
schema |
str vagy StructType |
A tábla sémadefiníciója. A sémák definiálhatók SQL DDL karakterláncként vagy Python StructType-ként. |
private |
bool |
Hozzon létre egy táblát, de ne tegye közzé a táblát a metaadattárban. Ez a tábla elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A privát táblák a csővezeték teljes élettartama alatt megőrződnek. Az alapértelmezett érték a False.A privát táblákat korábban a temporary paraméterrel hozták létre. |
row_filter |
str |
(Nyilvános előzetes verzió) A záradék a sorok szűrésére a táblában. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal. |
A séma megadása nem kötelező, és a PySparkkal StructType vagy az SQL DDL-vel is elvégezhető. Séma megadásakor opcionálisan tartalmazhat generált oszlopokat, oszlopmaszkokat, valamint elsődleges és idegen kulcsokat. See:
- Delta Lake által létrehozott oszlopok
- Az Azure Databricks korlátozásai
- Táblázat közzététele sorszűrőkkel és oszlopmaszkokkal.
Példák
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")