read_pubsub
streamelő táblaértékes függvény
A következőkre vonatkozik: Databricks SQL Databricks Runtime 13.3 LTS és újabb
Egy olyan táblát ad vissza, amely egy témakörből olvas be rekordokat a Pub/Sub fájlból. Csak a streamelési lekérdezéseket támogatja.
Syntax
read_pubsub( { parameter => value } [, ...])
Argumentumok
read_pubsub
névvel ellátott paraméterhívást igényel.
Az egyetlen kötelező argumentum az subscriptionId
, projectId
és topicId
. Az összes többi argumentum megadása nem kötelező.
Az argumentumok teljes leírását a Pub/Alstreamelési olvasás beállításainak konfigurálása című témakörben találhatja meg.
A Databricks a titkos kódok használatát javasolja az engedélyezési lehetőségek megadásakor. Lásd a titkos függvényt.
A Pubhoz/Alhoz való hozzáférés konfigurálásáról további információt a Pubhoz/Alhoz való hozzáférés konfigurálása című témakörben talál.
Paraméter | Típus | Leírás |
---|---|---|
subscriptionId |
STRING |
Kötelező, a Pub/Al előfizetéshez rendelt egyedi azonosító. |
projectId |
STRING |
Kötelező megadni a Pub/Al témakörhöz társított Google Cloud-projektazonosítót. |
topicId |
STRING |
Kötelező megadni annak a pub/altémakörnek az azonosítóját vagy nevét, amelyre feliratkozni szeretne. |
clientEmail |
STRING |
A hitelesítéshez használt szolgáltatásfiókhoz társított e-mail-cím. |
clientId |
STRING |
A szolgáltatásfiókhoz társított ügyfélazonosító hitelesítéshez. |
privateKeyId |
STRING |
A szolgáltatásfiókhoz társított titkos kulcs azonosítója. |
privateKey |
STRING |
A szolgáltatásfiókhoz társított titkos kulcs hitelesítéshez. |
Ezek az argumentumok további finomhangolásra szolgálnak a Pub/Sub olvasásakor:
Paraméter | Típus | Leírás |
---|---|---|
numFetchPartitions |
STRING |
A végrehajtók alapértelmezett számával nem kötelező megadni. Az előfizetésből rekordokat lekérő párhuzamos Spark-feladatok száma. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Alapértelmezés szerint false nem kötelező. Ha igaz értékre van állítva, a streamnek átadott előfizetés a streamelési feladat befejeződésekor törlődik. |
maxBytesPerTrigger |
STRING |
Az egyes aktivált mikrokötegek során feldolgozandó köteg méretének korlátja. Az alapértelmezett érték a "nincs". |
maxRecordsPerFetch |
STRING |
A rekordok feldolgozása előtt lekérendő rekordok száma tevékenységenként. Az alapértelmezett érték az "1000". |
maxFetchPeriod |
STRING |
Az egyes tevékenységek beolvasásának időtartama a rekordok feldolgozása előtt. Az alapértelmezett érték a "10s". |
Válaszok
Pub/Sub rekordok táblázata az alábbi sémával. Az attribútumok oszlopa lehet null, de az összes többi oszlop nem null.
Név | Adattípus | Nullázható | Standard | Leírás |
---|---|---|---|---|
messageId |
STRING |
Nem | A Pub/Sub üzenet egyedi azonosítója. | |
payload |
BINARY |
Nem | A Pub/Sub üzenet tartalma. | |
attributes |
STRING |
Igen | A Pub/Sub üzenet attribútumait képviselő kulcs-érték párok. Ez egy json kódolt sztring. | |
publishTimestampInMillis |
BIGINT |
Nem | Az üzenet közzétételének időbélyege ezredmásodpercben. | |
sequenceNumber |
BIGINT |
Nem | A rekord egyedi azonosítója a szegmensen belül. |
Példák
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Az adatokat most le kell kérni a testing.streaming_table
további elemzéshez.
Hibás lekérdezések:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);