Stream from Apache Pulsar

Important

This feature is in Public Preview.

In Databricks Runtime 14.1 and above, you can use Structured Streaming to stream data from Apache Pulsar on Azure Databricks.

Structured Streaming provides exactly-once processing semantics for data read from Pulsar sources.

Syntax example

The following is a basic example of using Structured Streaming to read from Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

You must always provide a service.url and one of the following options to specify topics:

  • topic
  • topics
  • topicsPattern

For a complete list of options, see Configure options for Pulsar streaming read.

Authenticate to Pulsar

Azure Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends using secrets when storing configuration details.

You can set the following options during stream configuration:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

If the stream uses a PulsarAdmin, also set the following:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

The following example demonstrates configuring authentication options:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar schema

The schema of records read from Pulsar depends on how topics have their schemas encoded.

  • For topics with Avro or JSON schema, field names and field types are preserved in the resulting Spark DataFrame.
  • For topics without schema or with a simple data type in Pulsar, the payload is loaded to a value column.
  • If the reader is configured to read multiple topics with different schemas, set allowDifferentTopicSchemas to load the raw content to a value column.

Pulsar records have the following metadata fields:

Column Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configure options for Pulsar streaming read

All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>") syntax. You can also configure authentication using options. See Authenticate to Pulsar.

The following table describes required configurations for Pulsar. You must specify only one of the options topic, topics or topicsPattern.

Option Default value Description
service.url none The Pulsar serviceUrl configuration for the Pulsar service.
topic none A topic name string for the topic to consume.
topics none A comma-separated list of the topics to consume.
topicsPattern none A Java regex string to match on topics to consume.

The following table describes other options supported for Pulsar:

Option Default value Description
predefinedSubscription none The predefined subscription name used by the connector to track spark application progress.
subscriptionPrefix none A prefix used by the connector to generate a random subscription to track spark application progress.
pollTimeoutMs 120000 The timeout for reading messages from Pulsar in milliseconds.
waitingForNonExistedTopic false Whether the connector should wait until the desired topics are created.
failOnDataLoss true Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).
allowDifferentTopicSchemas false If multiple topics with different schemas are read, use this parameter to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true.
startingOffsets latest If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset.
maxBytesPerTrigger none A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified.
admin.url none The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified.

You can also specify any Pulsar client, admin, and reader configurations using the following patterns:

Pattern Link to conifiguration options
pulsar.client.* Pulsar client configuration
pulsar.admin.* Pulsar admin configuration
pulsar.reader.* Pulsar reader configuration

Construct starting offsets JSON

You can manually construct a message ID to specify a specific offset and pass this as a JSON to the startingOffsets option. The following code example demonstrates this syntax:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()