Megosztás a következőn keresztül:


read_pubsub adatfolyam táblázatértékű függvény

A következőkre vonatkozik:jelölje be az igennel jelölt jelölőnégyzetet Databricks SQL jelölje be az igennel jelölt jelölőnégyzetet Databricks Runtime 13.3 LTS és újabb

Egy olyan táblát ad vissza, amely egy témakörből olvas be rekordokat a Pub/Sub fájlból. Csak a streamelési lekérdezéseket támogatja.

Szemantika

read_pubsub( { parameter => value } [, ...])

Argumentumok

read_pubsubnévvel ellátott paraméterhívást igényel.

Az egyetlen kötelező argumentum az subscriptionId, projectIdés topicId. Az összes többi argumentum megadása nem kötelező.

Az argumentumok teljes leírását a Pub/Sub adatfolyam-olvasás beállításainak konfigurálása című témakörben találhatja meg.

A Databricks a titkos kódok használatát javasolja az engedélyezési lehetőségek megadásakor. Lásd: secret függvény.

A Pubhoz/Alhoz való hozzáférés konfigurálásáról további információt a Pubhoz/Alhoz való hozzáférés konfigurálása című témakörben talál.

Paraméter Típus Leírás
subscriptionId STRING Kötelező, a Pub/Sub előfizetéshez rendelt egyedi azonosító.
projectId STRING Kötelező megadni a Pub/Al témakörhöz társított Google Cloud-projektazonosítót.
topicId STRING Kötelező megadni annak a pub/altémakörnek az azonosítóját vagy nevét, amelyre feliratkozni szeretne.
clientEmail STRING A hitelesítéshez használt szolgáltatásfiókhoz társított e-mail-cím.
clientId STRING A szolgáltatásfiókhoz társított ügyfélazonosító hitelesítéshez.
privateKeyId STRING A szolgáltatásfiókhoz társított titkos kulcs azonosítója.
privateKey STRING A szolgáltatásfiókhoz társított titkos kulcs hitelesítéshez.

Ezek az argumentumok további finomhangolásra szolgálnak a Pub/Sub olvasásakor:

Paraméter Típus Leírás
numFetchPartitions STRING Opcióként megadható az alapértelmezett végrehajtók száma. Az párhuzamos Spark-feladatok száma, amelyek rekordokat kérnek le egy előfizetésből.
deleteSubscriptionOnStreamStop BOOLEAN Alapértelmezés szerint választható false. Ha igaz értékre van állítva, a streamnek átadott előfizetés a streamelési feladat befejeződésekor törlődik.
maxBytesPerTrigger STRING A kezdeményezett mikrokötegek során feldolgozandó köteg méretének puha korlátja. Az alapértelmezett érték a "nincs".
maxRecordsPerFetch STRING A rekordok feldolgozása előtt lekérendő rekordok száma tevékenységenként. Az alapértelmezett érték az "1000".
maxFetchPeriod STRING Az egyes feladatok időtartama a rekordok feldolgozása előtti betöltéshez. Az alapértelmezett érték a "10s".

Visszatérítések

Pub/Sub rekordok táblázata az alábbi sémával. Az attribútumok oszlopa lehet null, de az összes többi oszlop nem null.

Név Adattípus Nullázható Szabvány Leírás
messageId STRING Nem A Pub/Sub üzenet egyedi azonosítója.
payload BINARY Nem A Pub/Sub üzenet tartalma.
attributes STRING Igen A Pub/Sub üzenet attribútumait képviselő kulcs-érték párok. Ez egy json kódolt sztring.
publishTimestampInMillis BIGINT Nem Az üzenet közzétételének időbélyege ezredmásodpercben.
sequenceNumber BIGINT Nem A rekord egyedi azonosítója a shardon belül.

Példák

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                clientEmail => secret('app-events', 'clientEmail'),
                clientId => secret('app-events', 'clientId'),
        privateKeyId => secret('app-events', 'privateKeyId'),
                privateKey => secret('app-events', 'privateKey')
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic'
);

Az adatokat most le kell kérni a testing.streaming_table további elemzéshez.

Hibás lekérdezések:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project'
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                maxRecordsPerFetchLimit => '1000001'
);