Stream from Apache Pulsar

Important

This feature is in Public Preview.

In Databricks Runtime 14.1 and above, you can use Structured Streaming to stream data from Apache Pulsar on Azure Databricks.

Structured Streaming provides exactly-once processing semantics for data read from Pulsar sources.

Syntax example

The following is a basic example of using Structured Streaming to read from Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

To read from Pulsar topics, you must provide a service.url and one of the following options:

  • topic
  • topics
  • topicsPattern

For a complete list of options, see Configure options for Pulsar streaming read.

Authenticate to Pulsar

Azure Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends that you use secrets to store configuration details.

Available stream configuration options include the following:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

If the stream uses a PulsarAdmin, you must set the following options:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

The following example demonstrates configuring authentication options:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar schema

When you read from Pulsar, the schema of rows depends on the schemas of the source's topics.

  • For topics with Avro or JSON schema, field names and field types are preserved in the resulting Spark DataFrame.
  • For topics without schema or with a simple data type in Pulsar, the payload is loaded to a value column.
  • If you configure the stream to read multiple topics with different schemas, set allowDifferentTopicSchemas to load the raw content to a value column.

Pulsar records have the following metadata fields:

Column Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configure options for Pulsar streaming read

Configure all of the following options with .option("<optionName>", "<optionValue>") syntax for read streams. You can also configure authentication using .options(). See Authenticate to Pulsar.

The following table describes required configurations for Pulsar. You must specify only one of the options topic, topics or topicsPattern.

Option Default value Description
service.url none The Pulsar serviceUrl configuration for the Pulsar service.
topic none A topic name string for the topic to consume.
topics none A comma-separated list of the topics to consume.
topicsPattern none A Java regex string to match on topics to consume.

The following table describes other options supported for Pulsar:

Option Default value Description
predefinedSubscription none The predefined subscription name used by the connector to track spark application progress.
subscriptionPrefix none A prefix used by the connector to generate a random subscription to track spark application progress.
pollTimeoutMs 120000 The timeout for reading messages from Pulsar in milliseconds.
waitingForNonExistedTopic false Whether the connector should wait until the desired topics are created.
failOnDataLoss true Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).
allowDifferentTopicSchemas false If multiple topics with different schemas are read, use this option to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true.
startingOffsets latest If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. You can also specify a JSON string for a specific offset.
maxBytesPerTrigger none A soft limit for the maximum number of bytes to process per micro-batch. If you specify this option, you must also specify admin.url.
admin.url none The Pulsar serviceHttpUrl configuration. Required when maxBytesPerTrigger is specified.

You can also specify any Pulsar client, admin, and reader configurations using the following patterns:

Pattern Configuration options
pulsar.client.* Pulsar client configuration
pulsar.admin.* Pulsar admin configuration
pulsar.reader.* Pulsar reader configuration

Construct starting offsets JSON

To use a custom message ID that specifies an offset, as JSON, with the startingOffsets option, see the following example:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()