S’abonner à Google Pub/Sub

Utilisez le connecteur intégré pour vous abonner à Google Pub/Sub. Ce connecteur fournit une sémantique de traitement une seule fois pour les enregistrements de l’abonné.

Remarque

Pub/Sub peut publier des enregistrements dupliqués, ou les enregistrements peuvent arriver à l’abonné hors commande. Écrivez du code pour gérer les enregistrements dupliqués et hors ordre.

Configurer un flux Pub/Sub

L’exemple de code suivant illustre la syntaxe de base pour la configuration d’une lecture Structured Streaming à partir de Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Pour plus d’options de configuration, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.

Configurer l’accès à Pub/Sub

Les informations d’identification que vous configurez doivent avoir les rôles suivants.

Rôles Obligatoire ou facultatif Utilisation du rôle
roles/pubsub.viewer ou roles/viewer Requis Vérifie si l’abonnement existe et obtient l’abonnement.
roles/pubsub.subscriber Requis Récupère des données à partir d’un abonnement.
roles/pubsub.editor ou roles/editor Facultatif Permet la création d'un abonnement s'il n'existe pas et autorise l'utilisation de deleteSubscriptionOnStreamStop pour supprimer des abonnements lors de la terminaison du flux.

Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Les options suivantes sont requises pour autoriser une connexion :

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Comprendre le schéma Pub/Sub

Le schéma du flux correspond aux enregistrements extraits de Pub/Sub, comme décrit dans le tableau suivant.

Champ Catégorie
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurer les options de lecture en continu de Pub/Sub

Le tableau suivant décrit les options prises en charge pour Pub/Sub. Toutes les options sont configurées dans le cadre d'une lecture de Structured Streaming en utilisant la syntaxe .option("<optionName>", "<optionValue>").

Remarque

Certaines options de configuration Pub/Sub utilisent le concept d’extractions au lieu de micro-lots. Cela reflète les détails de l’implémentation interne et les options fonctionnent de la même façon que les corollaires dans d’autres connecteurs Structured Streaming, sauf que les enregistrements sont récupérés, puis traités.

Choix Valeur par défaut Descriptif
numFetchPartitions Défini sur une moitié du nombre d’exécuteurs présents lors de l’initialisation du flux. Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement.
deleteSubscriptionOnStreamStop false Si true, l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine.
maxBytesPerTrigger none Limite souple pour la taille du lot à traiter lors de chaque micro-lot déclenché.
maxRecordsPerFetch 1000 Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements.
maxFetchPeriod 10s La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. Accepte une chaîne de durée, par exemple, 1s pendant 1 seconde ou 1m pendant 1 minute. Databricks recommande d’utiliser la valeur par défaut.

Utiliser le traitement par lots incrémentiel avec Pub/Sub

Vous pouvez utiliser Trigger.AvailableNow pour consommer des enregistrements disponibles à partir des sources Pub/Sub en tant que lot incrémentiel.

Azure Databricks enregistre le timestamp lorsque vous commencez une lecture avec le paramètre Trigger.AvailableNow. Les enregistrements traités par le lot incluent toutes les données extraites précédemment et tous les enregistrements récemment publiés avec un timestamp inférieur au timestamp de démarrage du flux enregistré. Pour plus d’informations, consultez AvailableNow: Traitement par lots incrémentiel.

Surveiller les métriques de flux Pub/Sub

Les métriques de progression Structured Streaming indiquent le nombre d’enregistrements récupérés et prêts à être traités, la taille des enregistrements extraits et prêts à être traités, ainsi que le nombre de doublons vus depuis le début du flux. Voici un exemple de ces métriques :

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limites

Pub/Sub ne prend pas en charge l’exécution spéculative (spark.speculation).