Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für: Databricks SQL
Databricks Runtime 13.3 LTS und höher
Gibt eine Tabelle mit Datensätzen zurück, die von Pub/Sub aus einem Thema gelesen wurden. Unterstützt nur Streamingabfragen.
Syntax
read_pubsub( { parameter => value } [, ...])
Argumente
read_pubsub
erfordert einen benannten Parameteraufruf.
Die einzigen erforderlichen Argumente sind subscriptionId
, projectId
und topicId
. Alle anderen Argumente sind optional.
Eine vollständige Beschreibung der Argumente finden Sie unter Konfigurieren von Optionen für Pub/Sub-Streaming-Lesevorgänge.
Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Siehe secret
Funktion.
Ausführliche Informationen zum Konfigurieren des Zugriffs auf Pub/Sub finden Sie unter Konfigurieren des Zugriffs auf Pub/Sub.
Parameter | Typ | Beschreibung |
---|---|---|
subscriptionId |
STRING |
Erforderlich, der eindeutige Bezeichner, der einem Pub/Sub-Abonnement zugewiesen ist. |
projectId |
STRING |
Erforderlich, die Google Cloud-Projekt-ID, die dem Pub/Sub-Thema zugeordnet ist. |
topicId |
STRING |
Erforderlich, die ID oder der Name des Pub/Sub-Themas, das abonniert werden soll. |
clientEmail |
STRING |
Die E-Mail-Adresse, die einem Dienstkonto für die Authentifizierung zugeordnet ist. |
clientId |
STRING |
Die Client-ID, die dem Dienstkonto für die Authentifizierung zugeordnet ist. |
privateKeyId |
STRING |
Die ID des privaten Schlüssels, der dem Dienstkonto zugeordnet ist. |
privateKey |
STRING |
Der private Schlüssel, der dem Dienstkonto für die Authentifizierung zugeordnet ist. |
Diese Argumente werden für die weitere Optimierung beim Lesen von Pub/Sub verwendet:
Parameter | Typ | Beschreibung |
---|---|---|
numFetchPartitions |
STRING |
Optional mit der Standardanzahl von Executors. Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Optional mit Standardwert false . Bei „true“ wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht. |
maxBytesPerTrigger |
STRING |
Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. Der Standardwert ist "none". |
maxRecordsPerFetch |
STRING |
Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden. Der Standardwert ist "1000". |
maxFetchPeriod |
STRING |
Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Der Standardwert ist "10s". |
Gibt zurück
Eine Tabelle mit Pub/Sub-Datensätzen mit dem folgenden Schema. Die Attributspalte kann NULL sein, aber alle anderen Spalten sind nicht NULL.
Name | Datentyp | NULL-Werte zulässig | Norm | Beschreibung |
---|---|---|---|---|
messageId |
STRING |
Nein | Eindeutiger Bezeichner für die Pub/Sub-Nachricht. | |
payload |
BINARY |
Nein | Der Inhalt der Pub/Sub-Nachricht. | |
attributes |
STRING |
Ja | Schlüssel-Wert-Paare, die die Attribute der Pub/Sub-Nachricht darstellen. Dies ist eine JSON-codierte Zeichenfolge. | |
publishTimestampInMillis |
BIGINT |
Nein | Der Zeitstempel, zu dem die Nachricht veröffentlicht wurde, in Millisekunden. | |
sequenceNumber |
BIGINT |
Nein | Der eindeutige Bezeichner des Datensatzes innerhalb des Shards. |
Beispiele
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
clientEmail => secret('app-events', 'clientEmail'),
clientId => secret('app-events', 'clientId'),
privateKeyId => secret('app-events', 'privateKeyId'),
privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic'
);
Die Daten müssen nun zur weiteren Analyse aus testing.streaming_table
abgefragt werden.
Fehlerhafte Abfragen:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
maxRecordsPerFetchLimit => '1000001'
);