नोट
इस पेज तक पहुँच के लिए प्रमाणन की आवश्यकता होती है. आप साइन इन करने या निर्देशिकाओं को बदलने का प्रयास कर सकते हैं.
इस पेज तक पहुँच के लिए प्रमाणन की आवश्यकता होती है. आप निर्देशिकाओं को बदलने का प्रयास कर सकते हैं.
Applies to:
Databricks SQL
Databricks Runtime 13.3 LTS and above
Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.
Syntax
read_pubsub( { parameter => value } [, ...])
Arguments
read_pubsub requires named parameter invocation.
The only required arguments are subscriptionId, projectId, and topicId. All other arguments are optional.
For full argument descriptions, see Configure options for Pub/Sub streaming read.
Databricks recommends using secrets when providing authorization options. See secret function.
For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.
| Parameter | Type | Description |
|---|---|---|
subscriptionId |
STRING |
Required, the unique identifier assigned to a Pub/Sub subscription. |
projectId |
STRING |
Required, the Google Cloud project ID associated with the Pub/Sub topic. |
topicId |
STRING |
Required, the ID or name of the Pub/Sub topic to subscribe to. |
clientEmail |
STRING |
The email address associated with a service account for authentication. |
clientId |
STRING |
The client ID associated with the service account for authentication. |
privateKeyId |
STRING |
The ID of the private key associated with the service account. |
privateKey |
STRING |
The private key associated with the service account for authentication. |
These arguments are used for further fine-tuning when reading from Pub/Sub:
| Parameter | Type | Description |
|---|---|---|
numFetchPartitions |
STRING |
Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Optional with default false. If set to true, the subscription passed to the stream is deleted when the streaming job ends. |
maxBytesPerTrigger |
STRING |
A soft limit for the batch size to be processed during each triggered micro-batch. The default is 'none'. |
maxRecordsPerFetch |
STRING |
The number of records to fetch per task before processing records. The default is '1000'. |
maxFetchPeriod |
STRING |
The time duration for each task to fetch before processing records. The default is '10s'. |
Returns
A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.
| Name | Data type | Nullable | Standard | Description |
|---|---|---|---|---|
messageId |
STRING |
No | Unique identifier for the Pub/Sub message. | |
payload |
BINARY |
No | The content of the Pub/Sub message. | |
attributes |
STRING |
Yes | Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string. | |
publishTimestampInMillis |
BIGINT |
No | The timestamp when the message was published, in milliseconds. | |
sequenceNumber |
BIGINT |
No | The unique identifier of the record within its shard. |
Examples
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
clientEmail => secret('app-events', 'clientEmail'),
clientId => secret('app-events', 'clientId'),
privateKeyId => secret('app-events', 'privateKeyId'),
privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic'
);
The data would now need to be queried from the testing.streaming_table for further analysis.
Erroneous queries:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
maxRecordsPerFetchLimit => '1000001'
);