Dela via


read_pubsub strömmande tabellvärdesfunktion

Gäller för: markerad ja Databricks SQL markerad ja Databricks Runtime 13.3 LTS och senare

Returnerar en tabell med poster som lästs från Pub/Sub från ett ämne. Stöder endast strömmande frågor.

Syntax

read_pubsub( { parameter => value } [, ...])

Argument

read_pubsub kräver namngiven parameteranrop.

De enda argument som krävs är subscriptionId, projectIdoch topicId. Alla andra argument är valfria.

Fullständiga argumentbeskrivningar finns i Konfigurera alternativ för pub-/underströmningsläsning.

Databricks rekommenderar att du använder hemligheter när du tillhandahåller auktoriseringsalternativ. Se hemlig funktion.

Mer information om hur du konfigurerar åtkomst till Pub/Sub finns i Konfigurera åtkomst till Pub/Sub.

Parameter Typ Beskrivning
subscriptionId STRING Obligatoriskt, den unika identifierare som tilldelats till en pub/underprenumeration.
projectId STRING Obligatoriskt, Google Cloud-projekt-ID som är associerat med ämnet Pub/Sub.
topicId STRING Obligatoriskt, ID eller namnet på puben/underavsnittet att prenumerera på.
clientEmail STRING E-postadressen som är associerad med ett tjänstkonto för autentisering.
clientId STRING Klient-ID:t som är associerat med tjänstkontot för autentisering.
privateKeyId STRING ID:t för den privata nyckel som är associerad med tjänstkontot.
privateKey STRING Den privata nyckel som är associerad med tjänstkontot för autentisering.

Dessa argument används för ytterligare finjustering vid läsning från Pub/Sub:

Parameter Typ Beskrivning
numFetchPartitions STRING Valfritt med standardantalet köre. Antalet parallella Spark-uppgifter som hämtar poster från en prenumeration.
deleteSubscriptionOnStreamStop BOOLEAN Valfritt med standard false. Om värdet är true tas prenumerationen som skickas till dataströmmen bort när direktuppspelningsjobbet upphör.
maxBytesPerTrigger STRING En mjuk gräns för batchstorleken som ska bearbetas under varje utlöst mikrobatch. Standardvärdet är "none".
maxRecordsPerFetch STRING Antalet poster som ska hämtas per aktivitet innan poster bearbetas. Standardvärdet är "1000".
maxFetchPeriod STRING Tidsåtgången för varje aktivitet som ska hämtas innan poster bearbetas. Standardvärdet är "10s".

Returer

En tabell med pub-/underposter med följande schema. Attributkolumnen kan vara null men alla andra kolumner är inte null.

Name Datatyp Kan ha värdet null Standard beskrivning
messageId STRING Nej Unik identifierare för pub-/undermeddelandet.
payload BINARY Nej Innehållet i pub-/undermeddelandet.
attributes STRING Ja Nyckel/värde-par som representerar attributen för pub-/undermeddelandet. Det här är en json-kodad sträng.
publishTimestampInMillis BIGINT Nej Tidsstämpeln när meddelandet publicerades i millisekunder.
sequenceNumber BIGINT Nej Den unika identifieraren för posten i dess fragment.

Exempel

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Data måste nu efterfrågas från testing.streaming_table för ytterligare analys.

Felaktiga frågor:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);