Fonctions table de diffusion en continu read_pubsub
S’applique à : Databricks SQL Databricks Runtime 13.3 LTS et versions ultérieures
Retourne une table avec des enregistrements lus depuis Pub/Sub à partir d’une rubrique. Prend uniquement en charge les requêtes de diffusion en continu.
read_pubsub( { parameter => value } [, ...])
read_pubsub
nécessite un appel de paramètre nommé.
Les seuls arguments requis sont subscriptionId
, projectId
et topicId
. Tous les autres arguments sont facultatifs.
Pour obtenir des descriptions complètes des arguments, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.
Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Consultez fonction secrète.
Pour plus d’informations sur la configuration de l’accès à Pub/Sub, consultez Configurer l’accès à Pub/Sub.
Paramètre | Type | Description |
---|---|---|
subscriptionId |
STRING |
Obligatoire, identificateur unique attribué à un abonnement Pub/Sub. |
projectId |
STRING |
Obligatoire, ID de projet Google Cloud associé à la rubrique Pub/Sub. |
topicId |
STRING |
Obligatoire, ID ou nom de la rubrique Pub/Sub à laquelle vous abonner. |
clientEmail |
STRING |
Adresse e-mail associée à un compte de service pour l’authentification. |
clientId |
STRING |
ID client associé au compte de service pour l’authentification. |
privateKeyId |
STRING |
ID de la clé privée associée au compte de service. |
privateKey |
STRING |
Clé privée associée au compte de service pour l’authentification. |
Ces arguments sont utilisés pour affiner davantage le réglage lors de la lecture à partir de Pub/Sub :
Paramètre | Type | Description |
---|---|---|
numFetchPartitions |
STRING |
Facultatif avec un nombre d’exécuteurs par défaut. Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Facultatif avec false par défaut. Si la valeur est définie sur vrai, l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine. |
maxBytesPerTrigger |
STRING |
Une limite réversible de la taille du lot à traiter pendant chaque micro-lot déclenché. La valeur par défaut est « None » (Aucun). |
maxRecordsPerFetch |
STRING |
Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements. La valeur par défaut est « 1 000 ». |
maxFetchPeriod |
STRING |
La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. La valeur par défaut est « 10s ». |
Table des enregistrements Pub/Sub avec le schéma suivant. La colonne d’attributs peut être null, mais toutes les autres colonnes ne le sont pas.
Name | Type de données | Nullable | Standard | Description |
---|---|---|---|---|
messageId |
STRING |
Non | Identificateur unique du message Pub/Sub. | |
payload |
BINARY |
Non | Le contenu du message Pub/Sub. | |
attributes |
STRING |
Oui | Paires clé-valeur représentant les attributs du message Pub/Sub. Il s’agit d’une chaîne encodée json. | |
publishTimestampInMillis |
BIGINT |
Non | L’horodatage lorsque le message a été publié, en millisecondes. | |
sequenceNumber |
BIGINT |
Non | Identificateur unique de l’enregistrement dans sa partition. |
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Les données doivent maintenant être interrogées depuis le testing.streaming_table
pour une analyse plus approfondie.
Requêtes erronées :
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);