Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta página se describe cómo puede usar Apache Kafka como origen o receptor al ejecutar cargas de trabajo de Structured Streaming en Azure Databricks.
Para más información sobre Kafka, consulte la documentación de Apache Kafka.
Leer datos desde Kafka
Use el kafka formato para configurar conexiones a Kafka. A continuación se muestra un ejemplo de lectura de streaming:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks también admite lecturas por lotes de Kafka, como en el ejemplo siguiente:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Para la carga por lotes incremental, Databricks recomienda usar Kafka con Trigger.AvailableNow. Consulte AvailableNow: Procesamiento por lotes incremental.
En Databricks Runtime 13.3 LTS y versiones posteriores, Azure Databricks también proporciona una función SQL para leer datos de Kafka. El streaming con SQL solo es compatible en las canalizaciones declarativas de Lakeflow Spark o con tablas de streaming en Databricks SQL. Consulte read_kafka función con valores de tabla.
Configuración del lector de Structured Streaming de Kafka
Para las consultas por lotes y streaming, debe establecer los servidores de arranque para el origen de Kafka con la siguiente opción:
| Clave | Importancia | Descripción |
|---|---|---|
kafka.bootstrap.servers |
Una lista separada por comas de host:port | Los servidores de arranque del clúster de Kafka |
Para establecer temas de suscripción, debe especificar una de las siguientes opciones:
| Opción | Importancia | Descripción |
|---|---|---|
subscribe |
Lista de temas separados por comas. | Lista de temas a la que suscribirse. |
subscribePattern |
Cadena de expresión regular de Java. | Patrón utilizado para suscribirse a los temas. |
assign |
Cadena JSON {"topicA":[0,1],"topic":[2,4]}. |
Específico topicPartitions para consumir. |
Consulte la página Opciones para obtener la lista completa de opciones disponibles.
Esquema para filas de Kafka
El lector de Kafka Structured Streaming devuelve filas con el esquema siguiente:
| Columna | Tipo |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
key y value siempre se deserializan como matrices de bytes con ByteArrayDeserializer. Use operaciones DataFrame (como cast("string") o from_avro) para deserializar explícitamente las claves y los valores.
Escritura de datos en Kafka
A continuación se muestra un ejemplo de escritura de flujo a Kafka:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks también admite la semántica de escritura por lotes en receptores de datos de Kafka, como se muestra en el ejemplo siguiente:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configuración del escritor de Structured Streaming de Kafka
Importante
Databricks Runtime 13.3 LTS y versiones posteriores incluyen una versión más reciente de la biblioteca kafka-clients que permite escrituras idempotentes de manera predeterminada. Si un receptor de Kafka usa la versión 2.8.0 o inferior con las ACL configuradas pero sin habilitar IDEMPOTENT_WRITE, se producirá un error en la escritura con el mensaje de error org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.
Para resolver este error, actualice a Kafka versión 2.8.0 o posterior, o estableciendo .option(“kafka.enable.idempotence”, “false”) al configurar el sistema de escritura de Structured Streaming.
A continuación se muestran las opciones habituales para las operaciones de escritura en Kafka:
| Clave | Importancia | Valor predeterminado | Descripción |
|---|---|---|---|
kafka.boostrap.servers |
Lista separada por comas de <host:port> |
ninguno | Required. Configuración de Kafka bootstrap.servers. |
topic |
STRING |
sin establecer | Opcional. Establece el tema para todas las filas que se vayan a escribir. Esta opción invalida cualquier columna de tema que exista en los datos. |
includeHeaders |
BOOLEAN |
false |
Opcional. Indica si se van a incluir los encabezados de Kafka en la fila. |
Consulte la página Opciones para obtener la lista completa de opciones disponibles.
Esquema para el escritor de Kafka
Al escribir datos en Kafka, el dataframe proporcionado puede incluir los siguientes campos:
| Nombre de la columna | Obligatorio u opcional | Tipo |
|---|---|---|
key |
opcional |
STRING o BINARY |
value |
required |
STRING o BINARY |
headers |
opcional | ARRAY |
topic |
opcional (se omite si topic se establece como opción de escritor) |
STRING |
partition |
opcional | INT |
Autenticación
Azure Databricks admite varios métodos de autenticación para Kafka, incluidas las credenciales de servicio del catálogo de Unity, SASL/SSL y opciones específicas de la nube para AWS MSK, Azure Event Hubs y Google Cloud Managed Kafka. Consulte Autenticación.
Obtención de métricas de Kafka
Para monitorizar el desfase con respecto a Kafka en una consulta de streaming, use las métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest y minOffsetsBehindLatest. Estas métricas informan del retraso medio, máximo y mínimo de desplazamiento en todas las particiones de temas suscritos, en relación con los desplazamientos más recientes en Kafka. Consulte Lectura interactiva de métricas.
Nota:
En Databricks Runtime 17.1 y versiones posteriores, los desplazamientos de Kafka más recientes se capturan después de que se complete cada microproceso. En los temas que reciben continuamente datos, las métricas de trabajos pendientes pueden mostrar valores pequeños y persistentes que no son ceros. Este es el comportamiento esperado y no indica que la secuencia se esté retrasando.
En Databricks Runtime 17.0 y versiones posteriores, los desplazamientos de Kafka más recientes se capturan en la hora de inicio del microproceso. Las métricas de trabajos pendientes pueden devolverse 0 cuando las consultas de streaming consumen constantemente todos los registros disponibles al principio del microproceso.
Para calcular los datos restantes de una consulta que se van a leer, use la estimatedTotalBytesBehindLatest métrica . Esta métrica calcula el número total de bytes restantes en todas las particiones suscritas en función de los lotes procesados en los últimos 300 segundos. Puede modificar el período de tiempo usado para esta estimación estableciendo la bytesEstimateWindowLength opción .
Por ejemplo, para establecer la longitud de la ventana en 10 minutos:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Si ejecuta la transmisión en un cuaderno, puede visualizar estas métricas bajo la pestaña Datos sin procesar en el panel de progreso de la consulta en streaming:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Consulte la supervisión de consultas de transmisión estructurada en Azure Databricks para obtener más información.
Ejemplo de Kafka a Delta Lake
En el ejemplo siguiente se muestra un flujo de trabajo completo para transmitir continuamente datos de Kafka a una tabla de Delta Lake. Puede usar este enfoque para cargas de trabajo de ingesta de datos casi en tiempo real.
En este ejemplo se usa un esquema JSON fijo. Para otros formatos como Avro o Protobuf, use from_avro o from_protobuf. También puede integrar con un registro de esquema. Consulte Ejemplo con el Registro de esquemas.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);