Megosztás a következőn keresztül:


read_pubsub streamelő táblaértékes függvény

A következőkre vonatkozik:jelölje be az igennel jelölt jelölőnégyzetet Databricks SQL jelölje be az igennel jelölt jelölőnégyzetet Databricks Runtime 13.3 LTS és újabb

Egy olyan táblát ad vissza, amely egy témakörből olvas be rekordokat a Pub/Sub fájlból. Csak a streamelési lekérdezéseket támogatja.

Syntax

read_pubsub( { parameter => value } [, ...])

Argumentumok

read_pubsubnévvel ellátott paraméterhívást igényel.

Az egyetlen kötelező argumentum az subscriptionId, projectIdés topicId. Az összes többi argumentum megadása nem kötelező.

Az argumentumok teljes leírását a Pub/Alstreamelési olvasás beállításainak konfigurálása című témakörben találhatja meg.

A Databricks a titkos kódok használatát javasolja az engedélyezési lehetőségek megadásakor. Lásd a titkos függvényt.

A Pubhoz/Alhoz való hozzáférés konfigurálásáról további információt a Pubhoz/Alhoz való hozzáférés konfigurálása című témakörben talál.

Paraméter Típus Leírás
subscriptionId STRING Kötelező, a Pub/Al előfizetéshez rendelt egyedi azonosító.
projectId STRING Kötelező megadni a Pub/Al témakörhöz társított Google Cloud-projektazonosítót.
topicId STRING Kötelező megadni annak a pub/altémakörnek az azonosítóját vagy nevét, amelyre feliratkozni szeretne.
clientEmail STRING A hitelesítéshez használt szolgáltatásfiókhoz társított e-mail-cím.
clientId STRING A szolgáltatásfiókhoz társított ügyfélazonosító hitelesítéshez.
privateKeyId STRING A szolgáltatásfiókhoz társított titkos kulcs azonosítója.
privateKey STRING A szolgáltatásfiókhoz társított titkos kulcs hitelesítéshez.

Ezek az argumentumok további finomhangolásra szolgálnak a Pub/Sub olvasásakor:

Paraméter Típus Leírás
numFetchPartitions STRING A végrehajtók alapértelmezett számával nem kötelező megadni. Az előfizetésből rekordokat lekérő párhuzamos Spark-feladatok száma.
deleteSubscriptionOnStreamStop BOOLEAN Alapértelmezés szerint falsenem kötelező. Ha igaz értékre van állítva, a streamnek átadott előfizetés a streamelési feladat befejeződésekor törlődik.
maxBytesPerTrigger STRING Az egyes aktivált mikrokötegek során feldolgozandó köteg méretének korlátja. Az alapértelmezett érték a "nincs".
maxRecordsPerFetch STRING A rekordok feldolgozása előtt lekérendő rekordok száma tevékenységenként. Az alapértelmezett érték az "1000".
maxFetchPeriod STRING Az egyes tevékenységek beolvasásának időtartama a rekordok feldolgozása előtt. Az alapértelmezett érték a "10s".

Válaszok

Pub/Sub rekordok táblázata az alábbi sémával. Az attribútumok oszlopa lehet null, de az összes többi oszlop nem null.

Név Adattípus Nullázható Standard Leírás
messageId STRING Nem A Pub/Sub üzenet egyedi azonosítója.
payload BINARY Nem A Pub/Sub üzenet tartalma.
attributes STRING Igen A Pub/Sub üzenet attribútumait képviselő kulcs-érték párok. Ez egy json kódolt sztring.
publishTimestampInMillis BIGINT Nem Az üzenet közzétételének időbélyege ezredmásodpercben.
sequenceNumber BIGINT Nem A rekord egyedi azonosítója a szegmensen belül.

Példák

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Az adatokat most le kell kérni a testing.streaming_table további elemzéshez.

Hibás lekérdezések:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);