Lưu ý
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử đăng nhập hoặc thay đổi thư mục.
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử thay đổi thư mục.
Important
This feature is in Public Preview.
In Databricks Runtime 14.1 and above, you can use Structured Streaming to stream data from Apache Pulsar on Azure Databricks.
Structured Streaming provides exactly-once processing semantics for data read from Pulsar sources.
Syntax example
The following is a basic example of using Structured Streaming to read from Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
To read from Pulsar topics, you must provide a service.url and one of the following options:
topictopicstopicsPattern
For a complete list of options, see Configure options for Pulsar streaming read.
Authenticate to Pulsar
Azure Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends that you use secrets to store configuration details.
Available stream configuration options include the following:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
If the stream uses a PulsarAdmin, you must set the following options:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
The following example demonstrates configuring authentication options:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar schema
When you read from Pulsar, the schema of rows depends on the schemas of the source's topics.
- For topics with Avro or JSON schema, field names and field types are preserved in the resulting Spark DataFrame.
- For topics without schema or with a simple data type in Pulsar, the payload is loaded to a
valuecolumn. - If you configure the stream to read multiple topics with different schemas, set
allowDifferentTopicSchemasto load the raw content to avaluecolumn.
Pulsar records have the following metadata fields:
| Column | Type |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configure options for Pulsar streaming read
Configure all of the following options with .option("<optionName>", "<optionValue>") syntax for read streams. You can also configure authentication using .options(). See Authenticate to Pulsar.
The following table describes required configurations for Pulsar. You must specify only one of the options topic, topics or topicsPattern.
| Option | Default value | Description |
|---|---|---|
service.url |
none | The Pulsar serviceUrl configuration for the Pulsar service. |
topic |
none | A topic name string for the topic to consume. |
topics |
none | A comma-separated list of the topics to consume. |
topicsPattern |
none | A Java regex string to match on topics to consume. |
The following table describes other options supported for Pulsar:
| Option | Default value | Description |
|---|---|---|
predefinedSubscription |
none | The predefined subscription name used by the connector to track spark application progress. |
subscriptionPrefix |
none | A prefix used by the connector to generate a random subscription to track spark application progress. |
pollTimeoutMs |
120000 | The timeout for reading messages from Pulsar in milliseconds. |
waitingForNonExistedTopic |
false |
Whether the connector should wait until the desired topics are created. |
failOnDataLoss |
true |
Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy). |
allowDifferentTopicSchemas |
false |
If multiple topics with different schemas are read, use this option to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true. |
startingOffsets |
latest |
If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. You can also specify a JSON string for a specific offset. |
maxBytesPerTrigger |
none | A soft limit for the maximum number of bytes to process per micro-batch. If you specify this option, you must also specify admin.url. |
admin.url |
none | The Pulsar serviceHttpUrl configuration. Required when maxBytesPerTrigger is specified. |
You can also specify any Pulsar client, admin, and reader configurations using the following patterns:
| Pattern | Configuration options |
|---|---|
pulsar.client.* |
Pulsar client configuration |
pulsar.admin.* |
Pulsar admin configuration |
pulsar.reader.* |
Pulsar reader configuration |
Construct starting offsets JSON
To use a custom message ID that specifies an offset, as JSON, with the startingOffsets option, see the following example:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()