Compartir a través de


Transmisión desde Apache Pulsar

Importante

Esta característica está en versión preliminar pública.

En Databricks Runtime 14.1 y versiones posteriores, puede usar Structured Streaming para transmitir datos de Apache Pulsar en Azure Databricks.

Structured Streaming proporciona una semántica de procesamiento exactamente una vez para los datos leídos de orígenes de Pulsar.

Ejemplo de sintaxis

A continuación se muestra un ejemplo básico del uso de Structured Streaming para leer desde Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Siempre debe proporcionar un service.url y una de las siguientes opciones para especificar temas:

  • topic
  • topics
  • topicsPattern

Para obtener una lista completa de las opciones, consulte Configurar opciones para la lectura de streaming de Pulsar.

Autenticación en Pulsar

Azure Databricks admite la autenticación de almacén de confianza y almacén de claves en Pulsar. Databricks recomienda usar secretos al almacenar los detalles de configuración.

Puede establecer las siguientes opciones durante la configuración de la transmisión:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Si la transmisión usa un PulsarAdmin, establezca también lo siguiente:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

En el ejemplo siguiente se muestra cómo configurar las opciones de autenticación:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema de Pulsar

El esquema de registros leídos de Pulsar depende de cómo los temas tengan codificados sus esquemas.

  • En los temas con el esquema Avro o JSON, los nombres de campo y los tipos de campo se conservan en el dataframe de Spark resultante.
  • Para temas sin esquema o con un tipo de datos simple en Pulsar, la carga se carga en una columna value.
  • Si el lector está configurado para leer varios temas con esquemas diferentes, establezca allowDifferentTopicSchemas para cargar el contenido sin procesar en una columna value.

Los registros de Pulsar tienen los siguientes campos de metadatos:

Column Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configuración de las opciones de lectura de streaming de Pulsar

Todas las opciones se configuran como parte de una lectura de Structured Streaming mediante la sintaxis .option("<optionName>", "<optionValue>"). También puede configurar la autenticación mediante opciones. Consulte Autenticación en Pulsar.

En la tabla siguiente se describen las configuraciones necesarias para Pulsar. Debe especificar solo una de las opciones topic, topics o topicsPattern.

Opción Valor predeterminado Descripción
service.url None Configuración de Pulsar serviceUrl para el servicio Pulsar.
topic None Cadena de nombre de tema para el tema que se va a consumir.
topics None Lista separada por comas de los temas que se van a consumir.
topicsPattern None Cadena regex de Java para que coincida con los temas que se van a consumir.

En la tabla siguiente se describen otras opciones admitidas para Pulsar:

Opción Valor predeterminado Descripción
predefinedSubscription None Nombre de suscripción predefinido usado por el conector para realizar un seguimiento del progreso de la aplicación de spark.
subscriptionPrefix None Prefijo usado por el conector para generar una suscripción aleatoria para realizar un seguimiento del progreso de la aplicación de spark.
pollTimeoutMs 120000 Tiempo de espera para leer mensajes de Pulsar en milisegundos.
waitingForNonExistedTopic false Si el conector debe esperar hasta que se creen los temas deseados.
failOnDataLoss true Controla si se produce un error en una consulta cuando se pierden los datos (por ejemplo, los temas se eliminan o los mensajes se eliminan debido a la directiva de retención).
allowDifferentTopicSchemas false Si se leen varios temas con esquemas diferentes, use este parámetro para desactivar la deserialización automática del valor del tema basado en esquemas. Solo se devuelven los valores sin procesar cuando es true.
startingOffsets latest Si es latest, el lector lee los registros más recientes después de empezar a ejecutarse. Si es earliest, el lector lee desde el desplazamiento más antiguo. El usuario también puede especificar una cadena JSON que especifique un desplazamiento específico.
maxBytesPerTrigger None Límite temporal del número máximo de bytes que queremos procesar por microlote. Si se especifica esto, admin.url también debe especificarse.
admin.url None Configuración de Pulsar serviceHttpUrl. Solo es necesario cuando maxBytesPerTrigger se especifica.

También puede especificar cualquier configuración de lector, administrador y cliente de Pulsar mediante los siguientes patrones:

Patrón Vínculo a las opciones de configuración
pulsar.client.* Configuración de cliente de Pulsar
pulsar.admin.* Configuración del administrador de Pulsar
pulsar.reader.* Configuración del lector de Pulsar

Construcción de desplazamientos iniciales JSON

Puede construir manualmente un identificador de mensaje para especificar un desplazamiento específico y pasarlo como JSON a la opción startingOffsets. En el ejemplo de código siguiente se muestra esta sintaxis:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()