Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Use the built-in connector to subscribe to Google Pub/Sub. This connector has exactly-once processing semantics for rows from the subscriber.
Note
Pub/Sub might publish duplicate rows, or rows might arrive to the subscriber out of order. You must write code to handle duplicate and out-of-order rows.
Configure a Pub/Sub stream
The following code example shows how to configure a Structured Streaming read from Pub/Sub and authenticate with private keys.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
For more configuration options, see Configure options for Pub/Sub streaming read.
Configure access to Pub/Sub
Your credentials must have the following roles:
| Roles | Required or optional | How role is used |
|---|---|---|
roles/pubsub.viewer or roles/viewer |
Required | Checks if subscription exists and gets subscription. |
roles/pubsub.subscriber |
Required | Fetches data from a subscription. |
roles/pubsub.editor or roles/editor |
Optional | Enables creation of a subscription if one doesn't exist and enables use of the deleteSubscriptionOnStreamStop to delete subscriptions on stream termination. |
Databricks recommends that you use secrets when using keys. The following options are required to authorize a connection:
clientEmailclientIdprivateKeyprivateKeyId
Understand the Pub/Sub schema
The schema for the stream matches the rows that are fetched from Pub/Sub, as described in the following table:
| Field | Type |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configure options for Pub/Sub streaming read
The following table describes the options supported for Pub/Sub. All options are configured with .option("<optionName>", "<optionValue>") on your stream reader.
Note
Some Pub/Sub configuration options use the concept of fetches instead of micro-batches. This is an internal implementation detail, and the options work similarly to other Structured Streaming connectors, except that rows are fetched and then processed.
| Key | Default value | Description |
|---|---|---|
numFetchPartitions |
Set to one half of the number of executors present at stream initialization. | The number of parallel Spark tasks that fetch rows from a subscription. |
deleteSubscriptionOnStreamStop |
false |
If true, the subscription passed to the stream is deleted when the streaming job ends. |
maxBytesPerTrigger |
none |
A soft limit for the batch size to be processed during each triggered micro-batch. |
maxRecordsPerFetch |
1000 |
The number of rows to fetch per task before processing rows. |
maxFetchPeriod |
10s |
The time duration for each task to fetch before processing rows. Accepts a duration string, for example, 1s for 1 second or 1m for 1 minute. Databricks recommends using the default value. |
Use incremental batch processing with Pub/Sub
You can use Trigger.AvailableNow to consume available rows from the Pub/Sub sources as an incremental batch.
Azure Databricks records the timestamp when you begin a read with the Trigger.AvailableNow setting. Rows processed by the batch include all previously fetched data and any newly published rows with a timestamp less than the recorded start timestamp. For more information, see AvailableNow: Incremental batch processing.
Monitor Pub/Sub streaming metrics
Structured Streaming progress metrics report the number of rows fetched and ready to process, the size of the rows fetched and ready to process, and the number of duplicates seen since stream start.
The following is an example of Pub/Sub metrics:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitations
Pub/Sub does not support speculative execution with spark.speculation.