Stream from Apache Pulsar
Important
This feature is in Public Preview.
In Databricks Runtime 14.1 and above, you can use Structured Streaming to stream data from Apache Pulsar on Azure Databricks.
Structured Streaming provides exactly-once processing semantics for data read from Pulsar sources.
Syntax example
The following is a basic example of using Structured Streaming to read from Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
You must always provide a service.url
and one of the following options to specify topics:
topic
topics
topicsPattern
For a complete list of options, see Configure options for Pulsar streaming read.
Authenticate to Pulsar
Azure Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends using secrets when storing configuration details.
You can set the following options during stream configuration:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
If the stream uses a PulsarAdmin
, also set the following:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
The following example demonstrates configuring authentication options:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar schema
The schema of records read from Pulsar depends on how topics have their schemas encoded.
- For topics with Avro or JSON schema, field names and field types are preserved in the resulting Spark DataFrame.
- For topics without schema or with a simple data type in Pulsar, the payload is loaded to a
value
column. - If the reader is configured to read multiple topics with different schemas, set
allowDifferentTopicSchemas
to load the raw content to avalue
column.
Pulsar records have the following metadata fields:
Column | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configure options for Pulsar streaming read
All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>")
syntax. You can also configure authentication using options. See Authenticate to Pulsar.
The following table describes required configurations for Pulsar. You must specify only one of the options topic
, topics
or topicsPattern
.
Option | Default value | Description |
---|---|---|
service.url |
none | The Pulsar serviceUrl configuration for the Pulsar service. |
topic |
none | A topic name string for the topic to consume. |
topics |
none | A comma-separated list of the topics to consume. |
topicsPattern |
none | A Java regex string to match on topics to consume. |
The following table describes other options supported for Pulsar:
Option | Default value | Description |
---|---|---|
predefinedSubscription |
none | The predefined subscription name used by the connector to track spark application progress. |
subscriptionPrefix |
none | A prefix used by the connector to generate a random subscription to track spark application progress. |
pollTimeoutMs |
120000 | The timeout for reading messages from Pulsar in milliseconds. |
waitingForNonExistedTopic |
false |
Whether the connector should wait until the desired topics are created. |
failOnDataLoss |
true |
Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy). |
allowDifferentTopicSchemas |
false |
If multiple topics with different schemas are read, use this parameter to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true . |
startingOffsets |
latest |
If latest , the reader reads the newest records after it starts running. If earliest , the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset. |
maxBytesPerTrigger |
none | A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified. |
admin.url |
none | The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified. |
You can also specify any Pulsar client, admin, and reader configurations using the following patterns:
Pattern | Link to conifiguration options |
---|---|
pulsar.client.* |
Pulsar client configuration |
pulsar.admin.* |
Pulsar admin configuration |
pulsar.reader.* |
Pulsar reader configuration |
Construct starting offsets JSON
You can manually construct a message ID to specify a specific offset and pass this as a JSON to the startingOffsets
option. The following code example demonstrates this syntax:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()