Compartir a través de


Ejecute su primera carga de trabajo de flujo estructurado

En este artículo, se proporcionan ejemplos de código y una explicación de los conceptos básicos necesarios para ejecutar las primeras consultas de flujo estructurado de Azure Databricks. Puede usar flujos estructurados para cargas de trabajo de procesamiento casi en tiempo real e incrementales.

Los flujos estructurados son una de las diversas tecnologías que potencian las tablas de streaming en Delta Live Tables. Databricks recomienda usar Delta Live Tables para todas las nuevas cargas de trabajo de ETL, ingesta y flujo estructurado. Consulte ¿Qué es Delta Live Tables?

Nota:

Aunque Delta Live Tables proporciona una sintaxis ligeramente modificada para declarar tablas de streaming, la sintaxis general para configurar las transformaciones y lecturas de streaming se aplica a todos los casos de uso de streaming de Azure Databricks. Delta Live Tables también simplifica el streaming mediante la administración de la información de estado, los metadatos y numerosas configuraciones.

Uso del cargador automático para leer datos de streaming desde el almacenamiento de objetos

En el ejemplo siguiente, se muestra cómo cargar datos JSON con el cargador automático, que usa cloudFiles para indicar el formato y las opciones. La opción schemaLocation habilita la inferencia y la evolución del esquema. Pegue el código siguiente en una celda del cuaderno de Databricks y ejecute la celda para crear un DataFrame de streaming denominado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Al igual que otras operaciones de lectura de Azure Databricks, la configuración de una lectura de streaming no carga realmente los datos. Es necesario desencadenar una acción en los datos antes de que comience la transmisión.

Nota:

Llamar a display() en un DataFrame de streaming inicia un trabajo de streaming. Para la mayoría de los casos de uso de flujo estructurado, la acción que desencadena la transmisión debería escribir datos en un receptor. Consulte Consideraciones de producción para Structured Streaming.

Realización de una transformación de streaming

El flujo estructurado admite la mayoría de las transformaciones disponibles en Azure Databricks y Spark SQL. Incluso es posible cargar modelos de MLflow como UDF y realizar predicciones de streaming como transformación.

En el ejemplo de código siguiente se completa una transformación sencilla para enriquecer los datos JSON ingeridos con información adicional mediante funciones de Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

El transformed_df resultante contiene instrucciones de consulta para cargar y transformar cada registro a medida que llega al origen de datos.

Nota:

El flujo estructurado trata los orígenes de datos como conjuntos de datos ilimitados o infinitos. Por lo tanto, algunas transformaciones no se admiten en cargas de trabajo de flujo estructurado porque requerirían ordenar un número infinito de elementos.

La mayoría de las agregaciones y muchas combinaciones requieren la administración de información de estado con marcas de agua, ventanas y modo de salida. Consulte Aplicación de marcas de agua para controlar los umbrales de procesamiento de datos.

Realización de una escritura por lotes incremental en Delta Lake

En el ejemplo siguiente, se escribe en Delta Lake mediante un punto de control y una ruta de acceso de archivo especificados.

Importante

Asegúrese siempre de especificar una ubicación de punto de control única para cada escritor de streaming que configure. El punto de control proporciona la identidad única de la transmisión, realizando un seguimiento de todos los registros procesados y la información de estado asociada a la consulta de streaming.

La configuración de availableNow para el desencadenador indica al flujo estructurado que procese todos los registros no procesados previamente del conjunto de datos de origen y, a continuación, se apague, por lo que puede ejecutar de forma segura el código siguiente sin preocuparse por dejar una secuencia en ejecución:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

En este ejemplo, no llegan nuevos registros al origen de datos, por lo que la ejecución repetida de este código no ingiere nuevos registros.

Advertencia

La ejecución de flujo estructurado puede impedir que la terminación automática apague los recursos de proceso. Para evitar costes inesperados, asegúrese de finalizar las consultas de streaming.

Leer datos de Delta Lake, transformar y escribir en Delta Lake

Delta Lake tiene una amplia compatibilidad para trabajar con flujo estructurado como origen y receptor. Consulte Lecturas y escrituras en streaming de tablas delta.

En el ejemplo siguiente, se muestra una sintaxis de ejemplo para cargar incrementalmente todos los registros nuevos de una tabla Delta, combinarlos con una instantánea de otra tabla Delta y escribirlos en una tabla Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Es necesario tener los permisos adecuados configurados para leer las tablas de origen y escribir en las tablas de destino, así como la ubicación de punto de control especificada. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores.

Nota:

Delta Live Tables proporciona una sintaxis totalmente declarativa para crear canalizaciones de Delta Lake y administra propiedades, como desencadenadores y puntos de control automáticamente. Consulte ¿Qué es Delta Live Tables?

Leer datos de Kafka, transformar y escribir en Kafka

Apache Kafka y otros buses de mensajería proporcionan algunas de las latencias más bajas disponibles para grandes conjuntos de datos. Use Azure Databricks para aplicar transformaciones a los datos ingeridos desde Kafka y, a continuación, vuelva a escribir datos en Kafka.

Nota:

La escritura de datos en el almacenamiento de objetos en la nube agrega una sobrecarga de latencia adicional. Si desea almacenar datos de un bus de mensajería en Delta Lake, pero requiere la menor latencia posible para las cargas de trabajo de streaming, Databricks recomienda configurar trabajos de streaming independientes para ingerir datos en el almacén de lago y aplicar transformaciones casi en tiempo real para receptores de bus de mensajería descendente.

En el ejemplo de código siguiente, se muestra un patrón sencillo para enriquecer datos de Kafka al combinarlos con datos de una tabla Delta y, a continuación, volver a escribir en Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Es necesario tener los permisos adecuados configurados para acceder al servicio Kafka. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores. Consulte Procesamiento de flujos con Apache Kafka y Azure Databricks.