Referencia del lenguaje Python de Delta Live Tables

En este artículo se proporcionan detalles de la interfaz de programación de Python de Delta Live Tables.

Para obtener información sobre la API de SQL, consulte Referencia del lenguaje SQL de Delta Live Tables.

Para obtener más información específica sobre cómo configurar el cargador automático, consulte ¿Qué es el cargador automático?.

Limitaciones

La interfaz de Python para Delta Live Tables tiene las siguientes limitaciones:

  • Las funciones table y view de Python deben devolver un objeto DataFrame. Algunas funciones que funcionan en objetos DataFrame no devuelven objetos DataFrame y no deben usarse. Dado que las transformaciones DataFrame se ejecutan después de que se haya resuelto el grafo de flujo de datos completo, el uso de estas operaciones podría tener efectos secundarios no deseados. Estas operaciones incluyen funciones como collect(), count(), toPandas(), save() y saveAsTable(). Sin embargo, puede incluir estas funciones fuera de las definiciones de función table o view porque este código se ejecuta una vez durante la fase de inicialización del grafo.
  • No se admite el uso de la función pivot(). La operación pivot en Spark requiere una carga diligente de los datos de entrada para calcular el esquema de la salida. Esta funcionalidad no se admite en Delta Live Tables.

Importar el módulo Python dlt

Las funciones de Python de Delta Live Tables se definen en el módulo dlt. Las canalizaciones implementadas con la API de Python deben importar este módulo:

import dlt

Crear una vista materializada de Delta Live Tables o una tabla de streaming

En Python, Delta Live Tables determina si se debe actualizar un conjunto de datos como una vista materializada o una tabla de streaming basada en la consulta definida. El decorador @table se usa para definir vistas materializadas y tablas de streaming.

Para definir una vista materializada en Python, aplique @table a una consulta que realice una lectura estática en un origen de datos. Para definir una tabla de streaming, aplique @table a una consulta que realice una lectura de streaming en un origen de datos. Ambos tipos de conjunto de datos tienen la misma especificación de sintaxis que se indica a continuación:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Crear una vista Delta Live Tables

Para definir una vista en Python, aplique el decorador @view. Al igual que el decorador @table, puede usar vistas en Delta Live Tables para conjuntos de datos estáticos o de streaming. A continuación se muestra la sintaxis para definir vistas con Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Ejemplo: definición de tablas y vistas

Para definir una tabla o vista en Python, aplique el decorador @dlt.view o @dlt.table a una función. Puede usar el nombre de la función o el parámetro name para asignar el nombre de la tabla o vista. En el ejemplo siguiente se definen dos conjuntos de datos diferentes: una vista denominada taxi_raw que toma un archivo JSON como origen de entrada y una tabla denominada filtered_data que toma la vista taxi_raw como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Ejemplo: acceso a un conjunto de datos definido en la misma canalización

Además de leer desde orígenes de datos externos, puede acceder a los conjuntos de datos definidos en la misma canalización con la función read() de Delta Live Tables. En el ejemplo siguiente se muestra cómo crear un conjunto de datos customers_filtered mediante la función read():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

También puede usar la función spark.table() para acceder a un conjunto de datos definido en la misma canalización. Al usar la función spark.table() para acceder a un conjunto de datos definido en la canalización, en el argumento de la función anteponga la palabra clave LIVE al nombre del conjunto de datos:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Ejemplo: lectura de una tabla registrada en un metastore

Para leer datos de una tabla registrada en el metastore de Hive, en el argumento de la función omita la palabra clave LIVE y, opcionalmente, califique el nombre de la tabla con el nombre de la base de datos:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Para obtener un ejemplo de lectura desde una tabla de Unity Catalog, consulte Ingerir datos en una canalización de Unity Catalog.

Ejemplo: acceso a un conjunto de datos mediante spark.sql

También puede devolver un conjunto de datos mediante una expresión spark.sql en una función de consulta. Para leer desde un conjunto de datos interno, anteponga LIVE. al nombre del conjunto de datos:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Creación de una tabla que se usará como destino de las operaciones de streaming

Use la función create_streaming_table() para crear una tabla de destino para los registros de salida mediante operaciones de streaming, incluidos apply_changes() y registros de salida de @append_flow.

Nota:

Las funciones create_target_table() y create_streaming_live_table() están en desuso. Databricks recomienda actualizar el código existente para usar la función create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumentos
name

Tipo: str

El nombre de la tabla.

Este parámetro es obligatorio.
comment

Tipo: str

Descripción opcional de la tabla.
spark_conf

Tipo: dict

Lista opcional de configuraciones de Spark para la ejecución de esta consulta.
table_properties

Tipo: dict

Lista opcional de propiedades de la tabla.
partition_cols

Tipo: array

Lista opcional de una o varias columnas que se usarán para crear particiones de la tabla.
path

Tipo: str

Ubicación de almacenamiento opcional para los datos de la tabla. Si no se establece, el sistema establecerá de manera predeterminada la ubicación de almacenamiento de la canalización.
schema

Tipo: str o StructType

Definición de esquema opcional para la tabla. Los esquemas se pueden definir como una cadena de DDL de SQL o con un objeto Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Tipo: dict

Restricciones de calidad de datos opcionales para la tabla. Consulte varias expectativas.

Controlar cómo se materializan las tablas

Las tablas también ofrecen un control adicional de su materialización:

Nota:

En el caso de las tablas de menos de 1 TB de tamaño, Databricks recomienda permitir que Delta Live Tables controle la organización de datos. A menos que espere que la tabla crezca más allá de un terabyte, normalmente no debe especificar columnas de partición.

Ejemplo: especificar un esquema y columnas de partición

Opcionalmente, puede especificar un esquema de tabla mediante un objeto StructType de Python o una cadena de DDL de SQL. La a definición puede incluir columnas generadascuando se especifica con una cadena DDL.

En el ejemplo siguiente se crea una tabla denominada sales con un esquema especificado mediante un Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

En el ejemplo siguiente se especifica el esquema de una tabla mediante una cadena DDL, se define una columna generada y se define una columna de partición:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

De manera predeterminada, Delta Live Tables deduce el esquema de la definición de tablesi no se especifica ningún esquema.

Configurar una tabla de streaming para omitir los cambios en una tabla de streaming de origen

Nota:

  • La marca skipChangeCommits solo funciona con spark.readStream mediante el uso de la función option(). No puede usar esta marca en una función dlt.read_stream().
  • No se puede usar la marca skipChangeCommits cuando la tabla de streaming de origen se define como destino de una función apply_changes().

De forma predeterminada, las tablas de streaming requieren orígenes de solo anexión. Cuando una tabla de streaming usa otra tabla de streaming como origen y la tabla de streaming de origen requiere actualizaciones o eliminaciones, por ejemplo, el procesamiento del "derecho al olvido" del RGPD, la marca skipChangeCommits se puede establecer al leer la tabla de streaming de origen para omitir esos cambios. Para obtener más información sobre esta marca, consulte Omitir actualizaciones y eliminaciones.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Propiedades de Delta Live Tables de Python

En las tablas siguientes se describen las opciones y propiedades que puede especificar al definir tablas y vistas con Delta Live Tables:

@table o @view
name

Tipo: str

Nombre opcional de la tabla o la vista. Si no está definido, el nombre de la función se usa como nombre de la tabla o la vista.
comment

Tipo: str

Descripción opcional de la tabla.
spark_conf

Tipo: dict

Lista opcional de configuraciones de Spark para la ejecución de esta consulta.
table_properties

Tipo: dict

Lista opcional de propiedades de la tabla.
path

Tipo: str

Ubicación de almacenamiento opcional para los datos de la tabla. Si no se establece, el sistema establecerá de manera predeterminada la ubicación de almacenamiento de la canalización.
partition_cols

Tipo: a collection of str

Recopilación opcional, por ejemplo un list de una o varias columnas que se usarán para crear particiones de la tabla.
schema

Tipo: str o StructType

Definición de esquema opcional para la tabla. Los esquemas se pueden definir como una cadena de DDL de SQL o con un objeto Python
StructType.
temporary

Tipo: bool

Cree una tabla, pero no publique los metadatos de la tabla. La palabra clave temporary indica a las tablas Delta Live que creen una tabla que esté disponible para la canalización, pero que no se debe tener acceso fuera de la canalización. Para reducir el tiempo de procesamiento, una tabla temporal persiste durante la duración del pipeline que la crea y no solo una actualización.

El valor predeterminado es False.
Definición de tabla o vista
def <function-name>()

Función de Python que define el conjunto de datos. Si no se establece el parámetro name, se usa <function-name> como nombre del conjunto de datos de destino.
query

Instrucción de Spark SQL que devuelve un DataFrame de Koalas o un conjunto de datos de Spark.

Use dlt.read() o spark.table() para realizar una lectura completa desde un conjunto de datos definido en la misma canalización. Al usar la función spark.table() para leer desde un conjunto de datos definido en la misma canalización, anteponga la palabra clave LIVE al nombre del conjunto de datos en el argumento de la función. Por ejemplo, para leer desde un conjunto de datos denominado customers:

spark.table("LIVE.customers")

También puede usar la función spark.table() para leer desde una tabla registrada en el metastore; para ello, omita la palabra clave LIVE y, opcionalmente, califique el nombre de la tabla con el nombre de la base de datos:

spark.table("sales.customers")

Use dlt.read_stream() para realizar una lectura de streaming desde un conjunto de datos definido en la misma canalización.

Use la función spark.sql a fin de definir una consulta SQL para crear el conjunto de datos devuelto.

Use la sintaxis de PySpark para definir consultas de Delta Live Tables con Python.
Expectativas
@expect("description", "constraint")

Declarar una restricción de calidad de datos identificada mediante
description. Si una fila infringe la expectativa, incluya la fila en el conjunto de datos de destino.
@expect_or_drop("description", "constraint")

Declarar una restricción de calidad de datos identificada mediante
description. Si una fila infringe la expectativa, anule la fila en el conjunto de datos de destino.
@expect_or_fail("description", "constraint")

Declarar una restricción de calidad de datos identificada mediante
description. Si una fila infringe la expectativa, detenga inmediatamente la ejecución.
@expect_all(expectations)

Declarar una o varias restricciones de calidad de datos.
expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe algunas de las expectativas, incluya la fila en el conjunto de datos de destino.
@expect_all_or_drop(expectations)

Declarar una o varias restricciones de calidad de datos.
expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe alguna de las expectativas, anule la fila en el conjunto de datos de destino.
@expect_all_or_fail(expectations)

Declarar una o varias restricciones de calidad de datos.
expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe alguna de las expectativas, detenga inmediatamente la ejecución.

Captura de datos modificados con Python en Delta Live Tables

Con la función apply_changes() de la Python API podrá usar la funcionalidad de captura de datos modificados de Delta Live Tables. La interfaz de Python delta Live Tables también proporciona la función create_streaming_table(). Esta función se puede usar para crear la tabla de destino que requiere la función apply_changes().

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Nota:

El comportamiento predeterminado de los eventos INSERT y UPDATE es actualizar/insertar (upsert) los eventos de captura de datos modificados desde el origen: actualizar las filas de la tabla de destino que coincidan con las claves especificadas o insertar una fila nueva cuando no exista un registro coincidente en la tabla de destino. El control de los eventos DELETE se especifica con la condición APPLY AS DELETE WHEN.

Importante

Debe declarar una tabla de streaming de destino en la que aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la tabla de destino apply_changes, también debe incluir las columnas __START_AT y __END_AT con el mismo tipo de datos que el campo sequence_by.

Consulte APPLY CHANGES en la API: simplificar la captura de datos de cambios en Delta Live Tables.

Argumentos
target

Tipo: str

Nombre de la tabla que se va a actualizar. Puede usar la función create_streaming_table() para crear la tabla de destino antes de ejecutar la función apply_changes().

Este parámetro es obligatorio.
source

Tipo: str

Origen de datos que contiene los registros de captura de datos modificados.

Este parámetro es obligatorio.
keys

Tipo: list

Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos de captura de datos modificados se aplican a registros específicos de la tabla de destino.

Puede especificar:

* Una lista de cadenas: ["userId", "orderId"]
* Una lista de funciones col() de Spark SQL: [col("userId"), col("orderId"]

Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no col(source.userId).

Este parámetro es obligatorio.
sequence_by

Tipo: str o col()

Nombre de columna que especifica el orden lógico de los eventos de captura de datos modificados en los datos de origen. Delta Live Tables usa esta secuenciación para controlar los eventos de cambio que llegan desordenados.

Puede especificar:

* Una cadena: "sequenceNum"
* Una función col() de Spark SQL: col("sequenceNum")

Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no col(source.userId).

Este parámetro es obligatorio.
ignore_null_updates

Tipo: bool

Permite la ingesta de actualizaciones que contengan un subconjunto de las columnas de destino. Cuando un evento de captura de datos modificados coincide con una fila existente y ignore_null_updates es True, las columnas con null conservan sus valores existentes en el destino. Esto también se aplica a las columnas anidadas con un valor de null. Cuando ignore_null_updates es False, los valores existentes se sobrescriben con valores null.

Este parámetro es opcional.

El valor predeterminado es False.
apply_as_deletes

Tipo: str o expr()

Especifica cuándo se debe tratar un evento de captura de datos modificados como DELETE en lugar de como upsert. Para controlar los datos desordenado, la fila eliminada se conserva temporalmente como marcador de exclusión en la tabla de Delta subyacente y se crea una vista en el metastore que filtra estos marcadores. El intervalo de retención se configura con la
pipelines.cdc.tombstoneGCThresholdInSecondspropiedad de tabla.

Puede especificar:

* Una cadena: "Operation = 'DELETE'"
* Una función expr() de Spark SQL: expr("Operation = 'DELETE'")

Este parámetro es opcional.
apply_as_truncates

Tipo: str o expr()

Especifica cuándo se debe tratar un evento de captura de datos modificados como TRUNCATE de tabla completa. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieran esta funcionalidad.

El parámetro apply_as_truncates solo se admite para el tipo SCD 1. El tipo SCD 2 no admite truncamiento.

Puede especificar:

* Una cadena: "Operation = 'TRUNCATE'"
* Una función expr() de Spark SQL: expr("Operation = 'TRUNCATE'")

Este parámetro es opcional.
column_list

except_column_list

Tipo: list

Subconjunto de columnas que se incluirán en la tabla de destino. Use column_list para especificar la lista completa de columnas que se incluirán. Use except_column_list para especificar las columnas que se excluirán. Puede declarar los valores como lista de cadenas o como funciones col() de Spark SQL:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no col(source.userId).

Este parámetro es opcional.

La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se pasa ningún argumento column_list o except_column_list a la función.
stored_as_scd_type

Tipo: str o int

Indica si se van a almacenar los registros como SCD de tipo 1 o SCD de tipo 2.

Establezca el valor en 1 para SCD de tipo 1 o en 2 para SCD de tipo 2.

Esta cláusula es opcional.

El valor predeterminado es SCD de tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Use track_history_column_list para especificar la lista completa de columnas de las que se va a realizar el seguimiento. Usar
track_history_except_column_list para especificar las columnas que se excluirán del seguimiento. Puede declarar los valores como lista de cadenas o como funciones col() de Spark SQL: - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no col(source.userId).

Este parámetro es opcional.

La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se pasa ningún argumento track_history_column_list o
track_history_except_column_list a la función.