Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Utilisez le connecteur intégré pour vous abonner à Google Pub/Sub. Ce connecteur fournit une sémantique de traitement une seule fois pour les enregistrements de l’abonné.
Remarque
Pub/Sub peut publier des enregistrements dupliqués, ou les enregistrements peuvent arriver à l’abonné hors commande. Écrivez du code pour gérer les enregistrements dupliqués et hors ordre.
Configurer un flux Pub/Sub
L’exemple de code suivant illustre la syntaxe de base pour la configuration d’une lecture Structured Streaming à partir de Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Pour plus d’options de configuration, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.
Configurer l’accès à Pub/Sub
Les informations d’identification que vous configurez doivent avoir les rôles suivants.
| Rôles | Obligatoire ou facultatif | Utilisation du rôle |
|---|---|---|
roles/pubsub.viewer ou roles/viewer |
Requis | Vérifie si l’abonnement existe et obtient l’abonnement. |
roles/pubsub.subscriber |
Requis | Récupère des données à partir d’un abonnement. |
roles/pubsub.editor ou roles/editor |
Facultatif | Permet la création d'un abonnement s'il n'existe pas et autorise l'utilisation de deleteSubscriptionOnStreamStop pour supprimer des abonnements lors de la terminaison du flux. |
Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Les options suivantes sont requises pour autoriser une connexion :
clientEmailclientIdprivateKeyprivateKeyId
Comprendre le schéma Pub/Sub
Le schéma du flux correspond aux enregistrements extraits de Pub/Sub, comme décrit dans le tableau suivant.
| Champ | Catégorie |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurer les options de lecture en continu de Pub/Sub
Le tableau suivant décrit les options prises en charge pour Pub/Sub. Toutes les options sont configurées dans le cadre d'une lecture de Structured Streaming en utilisant la syntaxe .option("<optionName>", "<optionValue>").
Remarque
Certaines options de configuration Pub/Sub utilisent le concept d’extractions au lieu de micro-lots. Cela reflète les détails de l’implémentation interne et les options fonctionnent de la même façon que les corollaires dans d’autres connecteurs Structured Streaming, sauf que les enregistrements sont récupérés, puis traités.
| Choix | Valeur par défaut | Descriptif |
|---|---|---|
numFetchPartitions |
Défini sur une moitié du nombre d’exécuteurs présents lors de l’initialisation du flux. | Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement. |
deleteSubscriptionOnStreamStop |
false |
Si true, l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine. |
maxBytesPerTrigger |
none |
Limite souple pour la taille du lot à traiter lors de chaque micro-lot déclenché. |
maxRecordsPerFetch |
1000 |
Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements. |
maxFetchPeriod |
10s |
La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. Accepte une chaîne de durée, par exemple, 1s pendant 1 seconde ou 1m pendant 1 minute. Databricks recommande d’utiliser la valeur par défaut. |
Utiliser le traitement par lots incrémentiel avec Pub/Sub
Vous pouvez utiliser Trigger.AvailableNow pour consommer des enregistrements disponibles à partir des sources Pub/Sub en tant que lot incrémentiel.
Azure Databricks enregistre le timestamp lorsque vous commencez une lecture avec le paramètre Trigger.AvailableNow. Les enregistrements traités par le lot incluent toutes les données extraites précédemment et tous les enregistrements récemment publiés avec un timestamp inférieur au timestamp de démarrage du flux enregistré. Pour plus d’informations, consultez AvailableNow: Traitement par lots incrémentiel.
Surveiller les métriques de flux Pub/Sub
Les métriques de progression Structured Streaming indiquent le nombre d’enregistrements récupérés et prêts à être traités, la taille des enregistrements extraits et prêts à être traités, ainsi que le nombre de doublons vus depuis le début du flux. Voici un exemple de ces métriques :
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limites
Pub/Sub ne prend pas en charge l’exécution spéculative (spark.speculation).