Compartir vía


Preguntas más frecuentes

Preguntas más frecuentes sobre el uso de Kafka con Azure Databricks.

¿Por qué obtengo un error de que no se admite o no se reconoce una opción de Kafka?

Un error común es olvidar el kafka. prefijo al establecer las opciones de configuración nativa de Kafka. Todas las opciones que se pasan directamente al cliente de Kafka deben tener kafka.el prefijo :

# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")

# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

Las opciones específicas del conector de Kafka de Spark (como subscribe, startingOffsets, maxOffsetsPerTrigger) no necesitan el prefijo. Consulte Opciones para obtener la lista completa.

¿Por qué recibo un error sobre las clases sombreadas de Kafka?

Azure Databricks requiere el uso de clases de Kafka sombreadas (prefijo con kafkashaded. o shadedmskiam.). Si ve errores como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, debe usar los nombres de clase sombreados:

  • org.apache.kafka.* Las clases requieren el kafkashaded. prefijo. Por ejemplo: kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
  • software.amazon.msk.* Las clases requieren el shadedmskiam. prefijo. Por ejemplo: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule

¿Por qué recibo un TimeoutException al conectarse a Kafka?

Las causas más comunes son:

  • Conectividad de red: el clúster de computación no puede comunicarse con los brokers de Kafka. Compruebe las reglas de firewall, los grupos de seguridad y las configuraciones de VPC.
  • Servidores de arranque incorrectos: compruebe que el nombre de host y el kafka.bootstrap.servers puerto son correctos.
  • Resolución DNS: Asegúrese de que los nombres de host de los brokers de Kafka se puedan resolver desde la red de Azure Databricks.
  • Problemas de SSL/TLS: si usa SSL, compruebe que los certificados están configurados correctamente.

Para las configuraciones de emparejamiento de Private Link o VPC, asegúrese de que las rutas de red correctas estén configuradas.

¿Debo usar el modo de procesamiento por lotes o streaming para Kafka?

Depende de su caso de uso:

  • Modo de streaming (spark.readStream): use cuando necesite procesamiento continuo de datos o ingesta de baja latencia.
  • Modo por lotes (spark.read): utilice para cargas de datos únicas, reposiciones o depuración. Requiere tanto startingOffsets como endingOffsets.

Consulte Configuración de intervalos de desencadenador de Structured Streaming para obtener más información sobre cómo configurar intervalos de desencadenador, como AvailableNow, ProcessingTimey el modo en tiempo real.

¿Puedo leer varios temas de Kafka en una sola secuencia?

Sí, puede usar:

  • subscribe: proporcione una lista separada por comas de temas, por ejemplo .option("subscribe", "topic1,topic2").
  • subscribePattern: use un patrón regex de Java para buscar coincidencias con los nombres de temas, por ejemplo .option("subscribePattern", "topic-.*").

¿Cómo uso Kafka con canalizaciones declarativas de Spark de Lakeflow?

Las canalizaciones declarativas de Spark de Lakeflow proporcionan compatibilidad nativa con orígenes de Kafka. Puede definir una tabla de streaming que lea de Kafka:

Python

import dlt

@dlt.table
def kafka_bronze():
  return (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:port>")
    .option("subscribe", "<topic>")
    .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>'
);

Consulte Carga de datos en canalizaciones para obtener más información sobre los orígenes de transmisión en las Canalizaciones Declarativas de Lakeflow Spark.

¿Cómo deserializar las columnas de clave y valor de Kafka?

Las key columnas y value se devuelven como binarias (BINARY tipo). Use operaciones de DataFrame para deserializarlas en función del formato de datos:

¿Por qué recibo un error de escritura idempotent?

Databricks Runtime 13.3 LTS y versiones posteriores incluyen una versión más reciente de la biblioteca kafka-clients que permite escrituras idempotentes de manera predeterminada. Si el clúster de Kafka usa la versión 2.8.0 o inferior con las ACL configuradas pero sin IDEMPOTENT_WRITE habilitado, la escritura falla con: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Para resolver este error, actualice a Kafka versión 2.8.0 o posterior, o estableciendo .option("kafka.enable.idempotence", "false") al configurar el sistema de escritura de Structured Streaming.

¿Qué es KAFKA_DATA_LOSS_ERROR y cómo puedo resolverlo?

Este error se produce cuando la fuente de Kafka detecta que los desplazamientos almacenados en el punto de control ya no están disponibles en Kafka, por lo general, porque:

  • El flujo se había pausado más tiempo que el período de retención de Kafka.
  • Los datos del tópico de Kafka se eliminaron o el tópico se recreó.
  • El broker de Kafka experimentó pérdida de datos.

Para resolver este problema:

  • Si la pérdida de datos es aceptable: establézcala .option("failOnDataLoss", "false") para permitir que la secuencia continúe desde el desplazamiento disponible más antiguo.
  • Si la pérdida de datos no es aceptable: restablezca el punto de control y reprocesar desde los earliest offsets, o restaure los datos de Kafka que faltan.

Consulte KAFKA_DATA_LOSS condición de error para obtener más información.

¿Cómo puedo controlar la velocidad a la que se leen los datos de Kafka?

Use la maxOffsetsPerTrigger opción para limitar el número de desplazamientos (aproximadamente el número de registros) procesados por microlote. Esto ayuda a evitar lotes grandes que podrían sobrecargar el procesamiento descendente o provocar problemas de memoria al ponerse al día en un trabajo pendiente.

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>',
  maxOffsetsPerTrigger => '10000'
);

Como alternativa, use opciones como minPartitions o maxRecordsPerPartition para controlar cuántas particiones de Spark se crean para cada lote.

¿Cómo puedo supervisar cuánto se retrasa mi flujo respecto a los últimos desplazamientos de Kafka?

Utilice las métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest y minOffsetsBehindLatest disponibles en el progreso de la consulta de streaming. Estos informan cuántos desplazamientos detrás del último desplazamiento disponible de su flujo se encuentran en todas las particiones de temas suscritos. Consulte Supervisión de consultas de Structured Streaming en Azure Databricks.

También puede usar estimatedTotalBytesBehindLatest para calcular el total de bytes de datos que aún no se han procesado.

¿Por qué la inicialización de la secuencia de Kafka es lenta?

Los flujos de Kafka requieren tiempo para:

  1. Conéctese al clúster de Kafka y capture los metadatos.
  2. Descubrir particiones de temas.
  3. Capturar desplazamientos iniciales.

En el caso de los clústeres de Kafka locales o remotos, la latencia de red puede afectar significativamente al tiempo de inicialización. Si ejecuta canalizaciones desencadenadas o programadas con reinicios frecuentes, considere la posibilidad de usar el modo de streaming continuo para evitar la sobrecarga de inicialización repetida.

¿Por qué no se agregan más ejecutores de Spark que aumentan el rendimiento de Kafka?

Una vez que los agentes de Kafka se saturan, agregar más ejecutores de Spark aumenta el costo sin aumentar el rendimiento.

Señales de que Kafka es el cuello de botella:

  • El rendimiento se estanca a pesar de agregar más núcleos.
  • La utilización de CPU o red del agente de Kafka es alta.
  • Las tareas de Spark se completan rápidamente, pero esperan nuevos datos.

Para resolverlo, escale el clúster de Kafka agregando agentes o aumentando los recuentos de particiones para distribuir la carga.

¿Cómo puedo optimizar el costo y el uso de proceso para el streaming de Kafka?

Para los modos microlote y AvailableNow:

  • Tamaño adecuado del clúster: supervise las métricas y establezca un tamaño de clúster fijo adecuado para la carga máxima.
  • Uso maxOffsetsPerTrigger: limite los tamaños de lote para controlar el uso de recursos durante los picos de carga.
  • Evitar el escalado automático: los trabajos de streaming se ejecutan continuamente y la adición o eliminación de nodos provoca una sobrecarga de reequilibrio de tareas.
  • Reducción de la desviación de datos: las particiones sesgadas causan que algunas tareas procesen significativamente más datos que otras, lo que lleva a rezagados que ralentizan la finalización general de los lotes y desperdician recursos de cómputo en tareas inactivas. Use la minPartitions opción de dividir particiones grandes de Kafka en particiones spark más pequeñas para un procesamiento más equilibrado.

Para el modo en tiempo real, el ajuste de tamaño correcto es especialmente importante porque las tareas pueden permanecer inactivas mientras esperan datos. Consideraciones clave:

  • Establézcalo maxPartitions para que cada tarea controle varias particiones de Kafka para reducir la sobrecarga.
  • Ajuste spark.sql.shuffle.partitions para trabajos intensivos en mezcla de datos.

Consulte Modo en tiempo real en Structured Streaming para obtener instrucciones sobre el ajuste de tamaño de los clústeres para el modo en tiempo real.

¿Por qué mi flujo no devuelve ningún registro aunque existan datos en el tema?

Las causas más comunes son:

  • Configuración startingOffsets incorrecta: el valor predeterminado es latest, que solo lee los nuevos datos que llegan después de que se inicie el flujo. Configure startingOffsets a earliest para leer los datos existentes.
  • Nombre de tema incorrecto: compruebe que se está suscribiendo al tema correcto.
  • Problemas de autenticación: es posible que el flujo se haya conectado correctamente, pero no tiene permisos para leer del tópico. Compruebe las ACL de Kafka.
  • Expiración de desplazamiento: si la secuencia se detuvo durante mucho tiempo y los desplazamientos del punto de control han expirado (eliminados por la retención de Kafka), es posible que tenga que restablecer el punto de control o ajustar failOnDataLoss.