Nota
L'accés a aquesta pàgina requereix autorització. Pots provar d'iniciar sessió o canviar de directori.
L'accés a aquesta pàgina requereix autorització. Pots provar de canviar directoris.
En este artículo, se proporcionan ejemplos de código y una explicación de los conceptos básicos necesarios para ejecutar las primeras consultas de flujo estructurado de Azure Databricks. Puede usar Structured Streaming (flujos estructurados) para cargas de trabajo de procesamiento casi en tiempo real e incrementales.
Structured Streaming es una de las varias tecnologías que potencia las tablas de streaming en las Canalizaciones Declarativas de Lakeflow Spark. Databricks recomienda utilizar las Canalizaciones Declarativas de Lakeflow Spark para todas las nuevas cargas de trabajo de ETL, ingesta y Structured Streaming. Consulte Canalizaciones declarativas de Spark de Lakeflow.
Nota:
Aunque Lakeflow Spark Declarative Pipelines proporciona una sintaxis ligeramente modificada para declarar tablas de streaming, la sintaxis general para configurar las transformaciones y lecturas de streaming se aplica a todos los casos de uso de streaming en Azure Databricks. Las canalizaciones declarativas de Spark de Lakeflow también simplifican el streaming mediante la administración de información de estado, metadatos y numerosas configuraciones.
Uso del cargador automático para leer datos de streaming desde el almacenamiento de objetos
En el ejemplo siguiente, se muestra cómo cargar datos JSON con el cargador automático, que usa cloudFiles para indicar el formato y las opciones. La opción schemaLocation habilita la inferencia y la evolución del esquema. Pegue el código siguiente en una celda del cuaderno de Databricks y ejecute la celda para crear un DataFrame de streaming denominado raw_df:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Al igual que otras operaciones de lectura de Azure Databricks, la configuración de una lectura de streaming no carga realmente los datos. Es necesario desencadenar una acción en los datos antes de que comience la transmisión.
Nota:
Llamar a display() en un DataFrame de streaming inicia un trabajo de streaming. Para la mayoría de los casos de uso de flujo estructurado, la acción que desencadena la transmisión debería escribir datos en un receptor. Consulte Consideraciones de producción para Structured Streaming.
Realización de una transformación de streaming
El Structured Streaming admite la mayoría de las transformaciones disponibles en Azure Databricks y Spark SQL. Incluso es posible cargar modelos de MLflow como UDF y realizar predicciones de streaming como transformación.
En el ejemplo de código siguiente se completa una transformación sencilla para enriquecer los datos JSON ingeridos con información adicional mediante funciones de Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
El transformed_df resultante contiene instrucciones de consulta para cargar y transformar cada registro a medida que llega al origen de datos.
Nota:
El flujo estructurado trata los orígenes de datos como conjuntos de datos ilimitados o infinitos. Por lo tanto, algunas transformaciones no se admiten en cargas de trabajo de flujo estructurado porque requerirían ordenar un número infinito de elementos.
La mayoría de las agregaciones y muchas combinaciones requieren la administración de información de estado con marcas de agua, ventanas y modo de salida. Consulte Aplicar marcas de agua para controlar los umbrales de procesamiento de datos.
Ejecutar una escritura incremental por lotes en Delta Lake
En el siguiente ejemplo, escribe en Delta Lake usando una ruta de archivo y un punto de control especificados.
Importante
Asegúrese siempre de especificar una ubicación de punto de control única para cada escritor de streaming que configure. El punto de control proporciona la identidad única de la transmisión, realizando un seguimiento de todos los registros procesados y la información de estado asociada a la consulta de streaming.
La configuración de availableNow para el desencadenador indica a Structured Streaming que procese todos los registros no procesados previamente del conjunto de datos de origen y luego se detenga, por lo que puede ejecutar el siguiente código de forma segura sin preocuparse por dejar un flujo en ejecución.
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
En este ejemplo, no llegan nuevos registros al origen de datos, por lo que la ejecución repetida de este código no ingiere nuevos registros.
Advertencia
La ejecución de flujo estructurado puede impedir que la terminación automática apague los recursos de proceso. Para evitar costes inesperados, asegúrese de finalizar las consultas de streaming.
Leer datos de Delta Lake, transformarlos y escribirlos en Delta Lake
Delta Lake ofrece un amplio soporte para trabajar con Streaming Estructurado como tanto origen como destino. Consulte Lecturas y escrituras en streaming de tablas delta.
En el ejemplo siguiente, se muestra una sintaxis de ejemplo para cargar incrementalmente todos los registros nuevos de una tabla Delta, combinarlos con una instantánea de otra tabla Delta y escribirlos en una tabla Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Es necesario tener los permisos adecuados configurados para leer las tablas de origen y escribir en las tablas de destino, así como la ubicación de punto de control especificada. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores.
Nota:
Lakeflow Spark Declarative Pipelines proporciona una sintaxis declarativa completa para crear canalizaciones de Delta Lake y administra propiedades como desencadenadores y puntos de control automáticamente. Consulte Canalizaciones declarativas de Spark de Lakeflow.
Leer datos de Kafka, transformar y escribir en Kafka
Apache Kafka y otros buses de mensajería proporcionan algunas de las latencias más bajas disponibles para grandes conjuntos de datos. Use Azure Databricks para aplicar transformaciones a los datos ingeridos desde Kafka y, a continuación, vuelva a escribir datos en Kafka.
Nota:
La escritura de datos en el almacenamiento de objetos en la nube agrega una sobrecarga de latencia adicional. Si desea almacenar datos de un bus de mensajería en Delta Lake, pero requiere la menor latencia posible para las cargas de trabajo de streaming, Databricks recomienda configurar trabajos de streaming independientes para ingerir datos en el almacén de lago y aplicar transformaciones casi en tiempo real para receptores de bus de mensajería descendente.
En el ejemplo de código siguiente, se muestra un patrón sencillo para enriquecer datos de Kafka al combinarlos con datos de una tabla Delta y, a continuación, volver a escribir en Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Es necesario tener los permisos adecuados configurados para acceder al servicio Kafka. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores. Consulte Procesamiento de flujos con Apache Kafka y Azure Databricks.