Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
El @table decorador se puede usar para definir tablas de transmisión en una canalización.
Para definir una tabla de streaming, aplique @table a una consulta que realice una lectura de streaming en un origen de datos o use la función create_streaming_table().
Nota:
En el módulo anterior dlt , el @table operador se usó para crear tablas de streaming y vistas materializadas. El @table operador del pyspark.pipelines módulo sigue funcionando de esta manera, pero Databricks recomienda usar el @materialized_view operador para crear vistas materializadas.
Syntax
from pyspark import pipelines as dp
@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = <bool>,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = <bool>)
@dp.expect(...)
def <function-name>():
return (<query>)
Parámetros
@dp.expect() es una cláusula opcional de expectativa de canalizaciones declarativas de Spark de Lakeflow. Puede incluir varias expectativas. Consulte Expectativas.
| Parámetro | Tipo | Description |
|---|---|---|
| función | function |
Obligatorio. Función que devuelve un DataFrame de streaming de Apache Spark desde una consulta definida por el usuario. |
name |
str |
El nombre de la tabla. Si no se proporciona, el valor predeterminado es el nombre de la función. |
comment |
str |
Una descripción de la tabla. |
spark_conf |
dict |
Lista de configuraciones de Spark para la ejecución de esta consulta |
table_properties |
dict |
Una dict de las propiedades de la tabla para la tabla. |
path |
str |
Una ubicación de almacenamiento para los datos de tabla. Si no se establece, use la ubicación de almacenamiento administrada para el esquema que contiene la tabla. |
partition_cols |
list |
Lista de una o varias columnas que se van a usar para crear particiones en la tabla. |
cluster_by_auto |
bool |
Habilite la agrupación automática de líquidos en la tabla. Esto se puede combinar con cluster_by y definir las columnas que se van a usar como claves de agrupación en clústeres iniciales, seguidas de las actualizaciones de supervisión y selección automática de claves en función de la carga de trabajo. Consulte Agrupación automática de líquidos. |
cluster_by |
list |
Habilite la agrupación en clústeres líquidos en la tabla y defina las columnas que se usarán como claves de agrupación en clústeres. Consulte Uso de clústeres líquidos para tablas. |
schema |
str o StructType |
Definición de esquema para la tabla. Los esquemas se pueden definir como una cadena de DDL de SQL o con un StructType de Python. |
private |
bool |
Cree una tabla, pero no publique la tabla en el metastore. Esa tabla está disponible en la canalización, pero no es accesible fuera de la canalización. Las tablas privadas persisten durante la vigencia de la canalización. El valor predeterminado es False.Las tablas privadas se crearon anteriormente con el temporary parámetro . |
row_filter |
str |
(Versión preliminar pública) Una cláusula de filtro de fila para la tabla. Vea Publicación de tablas con filtros de fila y máscaras de columna. |
Especificar un esquema es opcional y se puede realizar con PySpark StructType o SQL DDL. Al especificar un esquema, puede incluir opcionalmente columnas generadas, máscaras de columna y claves principales y externas. Vea:
- Columnas generadas por Delta Lake
- Restricciones en Azure Databricks
- Publique tablas con filtros de fila y máscaras de columna.
Examples
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")