Compartir vía


Funciones para definir conjuntos de datos

El pyspark.pipelines módulo (aquí con alias como dp) implementa gran parte de su funcionalidad principal mediante decoradores. Estos decoradores aceptan una función que define una consulta en modo batch o de streaming y devuelve un DataFrame de Apache Spark. La sintaxis siguiente muestra un ejemplo sencillo para definir un conjunto de datos de canalización:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

En esta página se proporciona información general sobre las funciones y consultas que definen conjuntos de datos en canalizaciones. Para obtener una lista completa de los decoradores disponibles, consulte Referencia para desarrolladores de canalizaciones.

Las funciones que se usan para definir conjuntos de datos no deben incluir lógica arbitraria de Python no relacionada con el conjunto de datos, incluidas las llamadas a api de terceros. Las canalizaciones ejecutan estas funciones varias veces durante el planeamiento, la validación y las actualizaciones. La inclusión de lógica arbitraria puede dar lugar a resultados inesperados.

Leer datos para comenzar una definición de conjunto de datos

Las funciones que se usan para definir conjuntos de datos del pipeline suelen comenzar con una operación spark.read o spark.readStream. Estas operaciones de lectura devuelven un objeto DataFrame estático o de streaming que se usa para definir transformaciones adicionales antes de devolver el dataframe. Otros ejemplos de operaciones de Spark que devuelven un dataframe incluyen spark.table, o spark.range.

Las funciones nunca deben hacer referencia a dataframes definidos fuera de la función. El intento de hacer referencia a dataframes definidos en un ámbito diferente podría dar lugar a un comportamiento inesperado. Para obtener un ejemplo de un patrón de metaprogramación para crear varias tablas, consulte Creación de tablas en un for bucle.

En los ejemplos siguientes se muestra la sintaxis básica para leer datos mediante lógica de streaming o por lotes:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Si necesita leer datos de una API REST externa, implemente esta conexión mediante un origen de datos personalizado de Python. Consulte Orígenes de datos personalizados de PySpark.

Nota:

Es posible crear Apache Spark DataFrames arbitrarios a partir de colecciones de datos de Python, incluyendo Pandas DataFrames, dicts y listas. Estos patrones pueden ser útiles durante el desarrollo y las pruebas, pero la mayoría de las definiciones de conjuntos de datos de canalización de producción deben comenzar cargando datos de archivos, un sistema externo o una tabla o vista existente.

Encadenamiento de transformaciones

Las canalizaciones admiten casi todas las transformaciones DataFrame de Apache Spark. Puede incluir cualquier número de transformaciones en la función de definición del conjunto de datos, pero debe asegurarse de que los métodos que use siempre devuelvan un objeto DataFrame.

Si tiene una transformación intermedia que dirige varias cargas de trabajo descendentes, pero no necesita materializarla como tabla, use @dp.temporary_view() para agregar una vista temporal al pipeline. A continuación, puede hacer referencia a esta vista mediante spark.read.table("temp_view_name") en varias definiciones de conjuntos de datos posteriores. La sintaxis siguiente muestra este patrón:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Esto garantiza que el pipeline tenga conocimiento total de las transformaciones en tu vista durante la planificación del pipeline y evite posibles problemas relacionados con el código arbitrario de Python que se ejecuta fuera de las definiciones del dataset.

Dentro de la función, puede encadenar DataFrames para crear nuevos DataFrames sin escribir resultados incrementales como vistas, vistas materializadas o tablas de streaming, como en el siguiente ejemplo.

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Si todos sus DataFrames realizan sus lecturas iniciales usando lógica por lotes, el resultado es un DataFrame estático. Si tiene alguna consulta que sea streaming, el resultado devuelto es un dataframe de streaming.

Devolver un dataframe

Use @dp.table para crear una tabla de streaming a partir de los resultados de una lectura de streaming. Use @dp.materialized_view para crear una vista materializada a partir de los resultados de una lectura por lotes. La mayoría de los demás decoradores funcionan con ambos tipos de DataFrames, ya sean de streaming o estáticos, mientras que algunos requieren un DataFrame de streaming.

La función que se usa para definir un conjunto de datos debe devolver un DataFrame de Spark. Nunca use métodos que guarden o escriban en archivos o tablas como parte del código del conjunto de datos de canalización.

Ejemplos de operaciones de Apache Spark que nunca se deben usar en el código de canalización:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Nota:

Las canalizaciones también admiten el uso de Pandas en Spark para funciones de definición de conjuntos de datos. Consulte Pandas API en Spark.

Uso de SQL en una canalización de Python

PySpark admite el spark.sql operador para escribir código DataFrame mediante SQL. Cuando se usa este patrón en el código fuente de la canalización, se compila en vistas materializadas o tablas de streaming.

El ejemplo de código siguiente es equivalente al uso spark.read.table("catalog_name.schema_name.table_name") para la lógica de consulta del conjunto de datos:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read y dlt.read_stream (legado)

El módulo anterior dlt incluye dlt.read() y dlt.read_stream() funciones que se introdujeron para admitir la funcionalidad en el modo de publicación de línea de proceso heredada. Estos métodos se admiten, pero Databricks recomienda usar siempre las spark.read.table() funciones y spark.readStream.table() debido a lo siguiente:

  • Las dlt funciones tienen compatibilidad limitada con la lectura de conjuntos de datos definidos fuera de la canalización actual.
  • Las spark funciones admiten la especificación de opciones, como skipChangeCommits, para leer las operaciones. Las funciones no admiten la dlt especificación de opciones.
  • El dlt módulo se ha reemplazado por el pyspark.pipelines módulo. Databricks recomienda usar from pyspark import pipelines as dp para importar pyspark.pipelines cuando se escribe código de canalizaciones en Python.