Fonctions table de diffusion en continu read_pulsar

S’applique à :check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 et versions ultérieures

Important

Cette fonctionnalité est disponible en préversion publique.

Retourne une table avec des enregistrements lus depuis Pulsar.

Cette fonction table ne prend en charge que la diffusion en continu et non la requête par lots.

Syntaxe

read_pulsar ( { option_key => option_value } [, ...] )

Arguments

Cette fonction nécessite un appel de paramètre nommé pour les clés d’option.

Les options serviceUrl et topic sont obligatoires.

Les descriptions des arguments sont brèves ici. Consultez la documentation flux structuré Pulsar pour obtenir des descriptions étendues.

Option Type Default Description
serviceUrl STRING Obligatoire URI du service Pulsar.
topic STRING Obligatoire Rubrique depuis laquelle lire.
predefinedSubscription STRING Aucun Nom d’abonnement prédéfini utilisé par le connecteur pour suivre la progression de l’application Spark.
subscriptionPrefix STRING Aucun Préfixe utilisé par le connecteur pour générer un abonnement aléatoire pour suivre la progression de l’application Spark.
pollTimeoutMs LONG 120 000 Délai d’attente de lecture des messages à partir de Pulsar en millisecondes.
failOnDataLoss BOOLEAN true Contrôle l’échec d’une requête lorsque les données sont perdues (par exemple, les rubriques sont supprimées ou les messages sont supprimés en raison d’une stratégie de rétention).
startingOffsets STRING latest Point de départ lorsqu’une requête est démarrée, soit la plus ancienne, la plus récente, ou une chaîne JSON qui spécifie un décalage spécifique. S’il s’agit de la plus récente, le lecteur lit les enregistrements les plus récents après le début de son exécution. S’il s’agit de la plus ancienne, le lecteur lit à partir du décalage le plus ancien. L’utilisateur peut également spécifier une chaîne JSON qui spécifie un décalage spécifique.
startingTime STRING Aucun Quand elle est spécifiée, la source Pulsar lit les messages depuis la position du startingTime spécifiée.

Les arguments suivants sont utilisés pour l’authentification du client pulsar :

Option Type Default Description
pulsarClientAuthPluginClassName STRING Aucun Nom du plug-in d’authentification.
pulsarClientAuthParams STRING Aucun Paramètres du plug-in d’authentification.
pulsarClientUseKeyStoreTls STRING Aucun Indique s’il faut utiliser KeyStore pour l’authentification tls.
pulsarClientTlsTrustStoreType STRING Aucun Type de fichier TrustStore pour l’authentification tls.
pulsarClientTlsTrustStorePath STRING Aucun Chemin d’accès du fichier TrustStore pour l’authentification tls.
pulsarClientTlsTrustStorePassword STRING Aucun Mot de passe TrustStore pour l’authentification tls.

Ces arguments sont utilisés pour la configuration et l’authentification du contrôle d’admission pulsar. La configuration d’administration pulsar est requise uniquement lorsque le contrôle d’admission est activé (lorsque maxBytesPerTrigger est défini)

Option Type Default Description
maxBytesPerTrigger BIGINT Aucun Limite logicielle du nombre maximal d’octets que nous voulons traiter par microbatch. Si cette valeur est spécifiée, la valeur admin.url doit également être spécifiée.
adminUrl STRING Aucun Configuration de serviceHttpUrl Pulsar. Nécessaire uniquement lorsque maxBytesPerTrigger est spécifié.
pulsarAdminAuthPlugin STRING Aucun Nom du plug-in d’authentification.
pulsarAdminAuthParams STRING Aucun Paramètres du plug-in d’authentification.
pulsarClientUseKeyStoreTls STRING Aucun Indique s’il faut utiliser KeyStore pour l’authentification tls.
pulsarAdminTlsTrustStoreType STRING Aucun Type de fichier TrustStore pour l’authentification tls.
pulsarAdminTlsTrustStorePath STRING Aucun Chemin d’accès du fichier TrustStore pour l’authentification tls.
pulsarAdminTlsTrustStorePassword STRING Aucun Mot de passe TrustStore pour l’authentification tls.

Retours

Table des enregistrements pulsar avec le schéma suivant.

  • __key STRING NOT NULL : clé de message Pulsar.

  • value BINARY NOT NULL : valeur du message Pulsar.

    Remarque : pour les rubriques avec un schéma Avro ou JSON, au lieu de charger du contenu dans un champ de valeur binaire, le contenu sera développé pour conserver les noms de champ et les types de champ de la rubrique Pulsar.

  • __topic STRING NOT NULL : nom de la rubrique Pulsar.

  • __messageId BINARY NOT NULL : ID du message Pulsar.

  • __publishTime TIMESTAMP NOT NULL : heure de publication du message Pulsar.

  • __eventTime TIMESTAMP NOT NULL : heure de l’évènement de message Pulsar.

  • __messageProperties MAP<STRING, STRING> : propriétés du message Pulsar.

Exemples

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.