Fonctions table de diffusion en continu read_pubsub

S’applique à :coche marquée oui Databricks SQL case marquée oui Databricks Runtime 13.3 LTS et versions ultérieures

Retourne une table avec des enregistrements lus depuis Pub/Sub à partir d’une rubrique. Prend uniquement en charge les requêtes de diffusion en continu.

Syntaxe

read_pubsub( { parameter => value } [, ...])

Arguments

read_pubsub nécessite un appel de paramètre nommé.

Les seuls arguments requis sont subscriptionId, projectIdet topicId. Tous les autres arguments sont facultatifs.

Pour obtenir des descriptions complètes des arguments, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.

Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Consultez fonction secrète.

Pour plus d’informations sur la configuration de l’accès à Pub/Sub, consultez Configurer l’accès à Pub/Sub.

Paramètre Type Description
subscriptionId STRING Obligatoire, identificateur unique attribué à un abonnement Pub/Sub.
projectId STRING Obligatoire, ID de projet Google Cloud associé à la rubrique Pub/Sub.
topicId STRING Obligatoire, ID ou nom de la rubrique Pub/Sub à laquelle vous abonner.
clientEmail STRING Adresse e-mail associée à un compte de service pour l’authentification.
clientId STRING ID client associé au compte de service pour l’authentification.
privateKeyId STRING ID de la clé privée associée au compte de service.
privateKey STRING Clé privée associée au compte de service pour l’authentification.

Ces arguments sont utilisés pour affiner davantage le réglage lors de la lecture à partir de Pub/Sub :

Paramètre Type Description
numFetchPartitions STRING Facultatif avec un nombre d’exécuteurs par défaut. Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement.
deleteSubscriptionOnStreamStop BOOLEAN Facultatif avec false par défaut. Si la valeur est définie sur vrai, l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine.
maxBytesPerTrigger STRING Une limite réversible de la taille du lot à traiter pendant chaque micro-lot déclenché. La valeur par défaut est « None » (Aucun).
maxRecordsPerFetch STRING Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements. La valeur par défaut est « 1 000 ».
maxFetchPeriod STRING La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. La valeur par défaut est « 10s ».

Retours

Table des enregistrements Pub/Sub avec le schéma suivant. La colonne d’attributs peut être null, mais toutes les autres colonnes ne le sont pas.

Name Type de données Nullable Standard Description
messageId STRING Non Identificateur unique du message Pub/Sub.
payload BINARY Non Le contenu du message Pub/Sub.
attributes STRING Oui Paires clé-valeur représentant les attributs du message Pub/Sub. Il s’agit d’une chaîne encodée json.
publishTimestampInMillis BIGINT Non L’horodatage lorsque le message a été publié, en millisecondes.
sequenceNumber BIGINT Non Identificateur unique de l’enregistrement dans sa partition.

Exemples

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Les données doivent maintenant être interrogées depuis le testing.streaming_table pour une analyse plus approfondie.

Requêtes erronées :

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);