Procesamiento de flujos con Apache Kafka y Azure Databricks

En este artículo se describe cómo puede usar Apache Kafka como origen o receptor al ejecutar cargas de trabajo de Structured Streaming en Azure Databricks.

Para más información sobre Kafka, consulte la documentación de Kafka.

Leer datos desde Kafka

A continuación se muestra un ejemplo de una transmisión leída desde Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks también admite la semántica de lectura por lotes para orígenes de datos de Kafka, como se muestra en el ejemplo siguiente:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Para la carga por lotes incremental, Databricks recomienda usar Kafka con Trigger.AvailableNow. Consulte Configuración del procesamiento por lotes incremental.

En Databricks Runtime 13.1 y versiones posteriores, Azure Databricks proporciona una función SQL para leer datos de Kafka. El streaming con SQL solo se admite en Delta Live Tables o con tablas de streaming en Databricks SQL. Consulte read_kafka función con valores de tabla.

Configuración del lector de Structured Streaming de Kafka

Azure Databricks proporciona la palabra clave kafka como formato de datos para configurar las conexiones a Kafka 0.10 y versiones posteriores.

A continuación se muestran las configuraciones más comunes para Kafka:

Hay varias maneras de especificar a qué temas suscribirse. Debe proporcionar únicamente uno de estos parámetros:

Opción Value Descripción
subscribe Lista de temas separados por comas. Lista de temas a la que suscribirse.
subscribePattern Cadena regex de Java. Patrón utilizado para suscribirse a los temas.
asignar Cadena JSON {"topicA":[0,1],"topic":[2,4]}. Elemento topicPartitions específico que se va a consumir.

Otras configuraciones importantes:

Opción Value Valor predeterminado Descripción
kafka.bootstrap.servers Lista separada por comas de host:puerto. empty [Obligatorio] Configuración bootstrap.servers de Kafka. Si descubre que no hay ningún dato de Kafka, compruebe primero la lista de direcciones del agente. Si la lista de direcciones del agente es incorrecta, es posible que no haya errores. Esto se debe a que el cliente de Kafka supone que los agentes estarán disponibles finalmente y, en caso de errores de red, vuelve a intentarlo indefinidamente.
failOnDataLoss true o false. true [Opcional] Indica si se debe producir un error en la consulta cuando sea posible que se hayan perdido datos. Las consultas pueden no leer datos de Kafka de forma permanente debido a muchos escenarios, como temas eliminados, el truncamiento de temas antes del procesamiento, etc. Intentamos calcular de forma conservadora si es posible que hayan perdido datos o no. A veces, esto puede provocar falsas alarmas. Establezca esta opción en false si no funciona según lo previsto o si desea que la consulta continúe con el procesamiento a pesar de la pérdida de datos.
minPartitions Entero >= 0, 0 = deshabilitado. 0 (Deshabilitado) [Opcional] Número mínimo de particiones que se van a leer de Kafka. Puede configurar Spark para que use un mínimo arbitrario de particiones para leer desde Kafka mediante la opción minPartitions. Normalmente, Spark tiene una asignación de 1 a 1 entre los elementos topicPartitions de Kafka y las particiones de Spark que consumen desde Kafka. Si establece la opción minPartitions en un valor mayor que el elemento topicPartitions de Kafka, Spark dividirá las particiones de Kafka de gran tamaño en partes más pequeñas. Esta opción se puede establecer en momentos de cargas máximas, de asimetría de datos y cuando el flujo se está quedando atrás para aumentar la tasa de procesamiento. Esto tiene el costo de inicializar los consumidores de Kafka en cada desencadenador, lo que puede afectar al rendimiento si usa SSL al conectarse a Kafka.
kafka.group.id Identificador del grupo de consumidores de Kafka. Sin establecer [Opcional] Identificador de grupo que se usará al leer desde Kafka. Utilícelo con precaución. De manera predeterminada, cada consulta genera un identificador de grupo único para leer datos. Esto garantiza que cada consulta tenga su propio grupo de consumidores que no se enfrenta a interferencias de ningún otro consumidor y, por tanto, pueda leer todas las particiones de sus temas suscritos. En algunos escenarios (por ejemplo, la autorización basada en grupos de Kafka), puede que desee usar identificadores de grupo autorizados específicos para leer datos. Opcionalmente, puede establecer el identificador de grupo. Sin embargo, haga esto con mucha precaución, ya que puede provocar un comportamiento inesperado.

* Es probable que las consultas de ejecución simultánea (tanto por lotes como por streaming) con el mismo identificador de grupo interfieran entre sí, lo que hace que cada consulta solo lea una parte de los datos.
* Esto también puede ocurrir cuando las consultas se inician o reinician en una sucesión rápida. Para minimizar estos problemas, establezca la configuración session.timeout.ms del consumidor de Kafka en un valor muy pequeño.
startingOffsets más antigua, más reciente latest [Opcional] Punto inicial cuando se inicia una consulta, ya sea "más antigua", que procede de los desplazamientos más antiguos o bien una cadena JSON, que especifica un desplazamiento inicial para cada TopicPartition. En JSON, se puede usar -2 como compensación para referirse a la más antigua y -1 a la más reciente. Nota: en el caso de las consultas por lotes, no se permite la más reciente (ya sea implícitamente o mediante el uso de -1 en JSON). En el caso de las consultas de streaming, esto solo se aplica cuando se inicia una nueva consulta y esa reanudación siempre se retomará desde dónde se dejó la consulta. Las particiones recién detectadas durante una consulta se iniciarán al principio.

Consulte Guía de integración de Kafka para streaming estructurado para ver otras configuraciones opcionales.

Esquema para registros de Kafka

El esquema de los registros de Kafka es:

Columna Tipo
key binary
value binary
topic string
partición int
offset long
timestamp long
timestampType int

key y value siempre se deserializan como matrices de bytes con ByteArrayDeserializer. Use operaciones de DataFrame (como cast("string")) para deserializar explícitamente las claves y los valores.

Escritura de datos en Kafka

A continuación se muestra un ejemplo de escritura de flujo a Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks también admite la semántica de escritura por lotes para receptores de datos de Kafka, como se muestra en el ejemplo siguiente:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configuración de escritor de Structured Streaming de Kafka

Importante

En Databricks Runtime 13.1 y versiones posteriores, se usa una versión más reciente de la biblioteca kafka-clients que habilita las escrituras idempotentes de forma predeterminada. Si un receptor de Kafka usa la versión 2.8.0 o inferior con las ACL configuradas pero sin habilitar IDEMPOTENT_WRITE, se producirá un error en la escritura con el mensaje de error org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Para resolver este error, actualice a Kafka versión 2.8.0 o posterior, o bien establezca .option(“kafka.enable.idempotence”, “false”) al configurar el sistema de escritura de Structured Streaming.

El esquema proporcionado a DataStreamWriter interactúa con el receptor de Kafka. Puede usar los campos siguientes:

Nombre de la columna Obligatorio u opcional Tipo
key opcional STRING o BINARY
value requerido STRING o BINARY
headers opcional ARRAY
topic opcional (se omite si topic se establece como opción de escritor) STRING
partition opcional INT

A continuación se muestran las opciones comunes establecidas al escribir en Kafka:

Opción Value Valor predeterminado Descripción
kafka.boostrap.servers Lista separada por comas de <host:port> None [Obligatorio] Configuración bootstrap.servers de Kafka.
topic STRING Sin establecer [Opcional] Establece el tema para que se escriban todas las filas. Esta opción invalida cualquier columna de tema que exista en los datos.
includeHeaders BOOLEAN false [Opcional] Si se van a incluir los encabezados de Kafka en la fila.

Consulte Guía de integración de Kafka para streaming estructurado para ver otras configuraciones opcionales.

Recuperación de métricas de Kafka

Puede obtener el promedio, el mínimo y el máximo del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos con las métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest y minOffsetsBehindLatest. Consulte Lectura interactiva de métricas.

Nota:

Disponible en Databricks Runtime 9.1 y versiones posteriores.

Obtenga el número total estimado de bytes de los temas suscritos que el proceso de consulta no haya consumido mediante una examinación del valor del elemento estimatedTotalBytesBehindLatest. Esta estimación se basa en los lotes procesados en los últimos 300 segundos. El período de tiempo en el que se basa la estimación se puede cambiar estableciendo la opción bytesEstimateWindowLength en otro valor. Por ejemplo, para establecerla en 10 minutos:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Si ejecuta el flujo en un cuaderno, puede ver estas métricas en la pestaña Datos sin procesar en el panel de progreso de la consulta de transmisión:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Uso de SSL para conectar Azure Databricks a Kafka

Para habilitar las conexiones SSL a Kafka, siga las instrucciones que se indican en Cifrado y autenticación con SSL de la documentación de Confluent. Puede proporcionar las configuraciones descritas allí, con el prefijo kafka., como opciones. Por ejemplo, especifique la ubicación del almacén de confianza en la propiedad kafka.ssl.truststore.location.

Databricks recomienda que:

  • Almacene los certificados en el almacenamiento de objetos en la nube. Puede restringir el acceso a los certificados solo a los clústeres que puedan acceder a Kafka. Vea Guía de gobernanza de datos.
  • Almacene las contraseñas de certificado como secretos en un ámbito de secretos.

En el ejemplo siguiente se usan ubicaciones de almacenamiento de objetos y secretos de Databricks para habilitar una conexión SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Conexión de Kafka en HDInsight a Azure Databricks

  1. Cree un clúster de Kafka de HDInsight.

    Consulte Conexión con Apache Kafka en HDInsight mediante una instancia de Azure Virtual Network para obtener instrucciones.

  2. Configure los agentes de Kafka para que anuncien la dirección correcta.

    Siga las instrucciones que se indican en Configuración de Kafka para anunciar direcciones IP. Si administra Kafka usted mismo en Azure Virtual Machines, asegúrese de que la configuración advertised.listeners de los agentes esté establecida en la dirección IP interna de los hosts.

  3. Cree un clúster de Azure Databricks.

  4. Empareje el clúster de Kafka con el clúster de Azure Databricks.

    Siga las instrucciones que se indican en Emparejamiento de redes virtuales.