Sdílet prostřednictvím


read_pubsub streamovaná funkce s hodnotou tabulky

Platí pro: zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 13.3 LTS a vyšší

Vrátí tabulku se záznamy načtenými z pub/Sub z tématu. Podporuje pouze dotazy streamování.

Syntaxe

read_pubsub( { parameter => value } [, ...])

Argumenty

read_pubsub vyžaduje vyvolání pojmenovaného parametru.

Jedinými povinnými argumenty jsou subscriptionId, projectIda topicId. Všechny ostatní argumenty jsou volitelné.

Úplný popis argumentu najdete v tématu Konfigurace možností čtení pub/sub streamingu.

Databricks doporučuje používat tajné kódy při poskytování možností autorizace. Viz funkce tajného kódu.

Podrobnosti o konfiguraci přístupu k pub/sub naleznete v tématu Konfigurace přístupu k Pub/Sub.

Parametr Typ Popis
subscriptionId STRING Povinný identifikátor přiřazený k předplatnému Pub/Sub.
projectId STRING Povinné je ID projektu Google Cloud přidružené k tématu Pub/Sub.
topicId STRING Povinné, ID nebo název tématu Pub/Sub pro přihlášení k odběru.
clientEmail STRING E-mailová adresa přidružená k účtu služby pro ověřování.
clientId STRING ID klienta přidružené k účtu služby pro ověřování.
privateKeyId STRING ID privátního klíče přidruženého k účtu služby.
privateKey STRING Privátní klíč přidružený k účtu služby pro ověřování.

Tyto argumenty se používají k dalšímu vyladění při čtení z pub/sub:

Parametr Typ Popis
numFetchPartitions STRING Volitelné s výchozím počtem exekutorů. Početparalelních
deleteSubscriptionOnStreamStop BOOLEAN Volitelné s výchozím nastavením false. Pokud je nastavená hodnota true, odběr předaný streamu se po skončení úlohy streamování odstraní.
maxBytesPerTrigger STRING Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky. Výchozí hodnota je žádná.
maxRecordsPerFetch STRING Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů. Výchozí hodnota je 1000.
maxFetchPeriod STRING Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Výchozí hodnota je 10s.

Návraty

Tabulka záznamů Pub/Sub s následujícím schématem. Sloupec atributů může mít hodnotu null, ale všechny ostatní sloupce nemají hodnotu null.

Name Datový typ Vynulovatelné Standard Popis
messageId STRING No Jedinečný identifikátor zprávy Pub/Sub.
payload BINARY No Obsah zprávy Pub/Sub
attributes STRING Ano Páry klíč-hodnota představující atributy zprávy Pub/Sub Jedná se o řetězec kódovaný ve formátu JSON.
publishTimestampInMillis BIGINT No Časové razítko při publikování zprávy v milisekundách.
sequenceNumber BIGINT No Jedinečný identifikátor záznamu v rámci jeho horizontálního oddílu.

Příklady

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Data by se teď museli dotazovat z testing.streaming_table další analýzy.

Chybné dotazy:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);