Tampons de protocole de lecture et d’écriture

Azure Databricks fournit une prise en charge native de la sérialisation et de la désérialisation entre les structures (struct) Apache Spark et les tampons de protocole (protobuf). La prise en charge de protobuf est implémentée sous forme de transformateur DataFrame Apache Spark et s’utilise avec Structured Streaming ou pour des opérations par lots.

Comment désérialiser et sérialiser des tampons de protocole

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser les fonctions from_protobuf et to_protobuf pour sérialiser et désérialiser des données. La sérialisation protobuf est couramment utilisée dans les charges de travail de streaming.

La syntaxe de base des fonctions protobuf est similaire pour les fonctions de lecture et d’écriture. Vous devez importer ces fonctions avant de les utiliser.

from_protobuf caste une colonne binaire en struct et to_protobuf caste une colonne struct en binaire. Vous devez fournir un registre de schémas spécifié avec l’argument options ou bien un fichier descripteur identifié par l’argument descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Les exemples suivants illustrent le traitement d’enregistrements protobuf binaires avec from_protobuf() et la conversion d’un struct Spark SQL en protobuf binaire avec to_protobuf().

Utiliser protobuf avec le registre de schémas Confluent

Azure Databricks prend en charge l’utilisation du Confluent Schema Registry pour définir Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

S’authentifier auprès d’un registre de schémas Confluent externe

Pour vous authentifier auprès d’un registre de schémas Confluent externe, mettez à jour les options du registre de schémas afin d’inclure les clés API et les informations d’identification pour l’authentification.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Utiliser les fichiers truststore et keystore (magasin de clés) dans les volumes Unity Catalog

Dans Databricks Runtime 14.3 LTS et versions ultérieures, vous pouvez utiliser des fichiers truststore et keystore dans les volumes Unity Catalog pour vous authentifier auprès de Confluent Schema Registry. Mettez à jour les options de votre registre de schémas en fonction de l’exemple suivant :

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Utiliser protobuf avec un fichier descripteur

Vous pouvez également référencer un fichier de descripteur protobuf qui est disponible pour votre cluster de calcul. Vérifiez que vous disposez des autorisations appropriées pour lire le fichier, en fonction de son emplacement.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Options prises en charge dans les fonctions Protobuf

Les options suivantes sont prises en charge dans les fonctions Protobuf.

  • mode : détermine comment les erreurs lors de la désérialisation des enregistrements Protobuf sont gérées. Les erreurs peuvent être causées par différents types d’enregistrements mal formés, y compris une incompatibilité entre le schéma réel de l’enregistrement et le schéma attendu fourni dans from_protobuf().
    • Valeurs :
      • FAILFAST(valeur par défaut) : une erreur est générée lorsqu’un enregistrement mal formé est rencontré et que la tâche échoue.
      • PERMISSIVE : une valeur NULL est retournée pour les enregistrements mal formés. Utilisez cette option avec soin, car elle peut entraîner la suppression de nombreux enregistrements. Cela est utile lorsqu’une petite fraction des enregistrements de la source est incorrecte.
  • recursive.fields.max.depth : ajoute la prise en charge des champs récursifs. Les schémas SQL Spark ne prennent pas en charge les champs récursifs. Lorsque cette option n’est pas spécifiée, les champs récursifs ne sont pas autorisés. Pour prendre en charge les champs récursifs dans Protobufs, ils doivent être étendus à une profondeur spécifiée.
    • Valeurs :

      • -1 (valeur par défaut) : les champs récursifs ne sont pas autorisés.

      • 0 : Les champs récursifs sont supprimés.

      • 1 : Autorise un seul niveau de récursivité.

      • [2-10] : spécifiez un seuil pour plusieurs récursivités, jusqu’à 10 niveaux.

        La définition d’une valeur supérieure à 0 autorise les champs récursifs en développant les champs imbriqués à la profondeur configurée. Les valeurs supérieures à 10 ne sont pas autorisées pour éviter de créer par inadvertance des schémas très volumineux. Si un message Protobuf a une profondeur supérieure à la limite configurée, le struct Spark retourné est tronqué après la limite de récursivité.

    • Exemple : considérez un protobuf avec le champ récursif suivant :

      message Person { string name = 1; Person friend = 2; }
      

      Les listes suivantes répertorient le schéma de fin avec différentes valeurs pour ce paramètre :

      • Option définie sur 1 : STRUCT<name: STRING>
      • Option définie sur 2 : STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Option définie sur 3 : STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json : cette option permet de convertir protobuf tous les champs en JSON. Cette fonctionnalité doit être activée avec soin. La conversion et le traitement JSON sont inefficaces. En outre, le champ de chaîne JSON perd la sécurité du schéma Protobuf, ce qui rend le traitement en aval susceptible d’erreurs.
    • Valeurs :

      • False (valeur par défaut) : au moment de l’exécution, ces champs à caractère générique peuvent contenir des messages Protobuf arbitraires en tant que données binaires. Par défaut, ces champs sont gérés comme un message Protobuf normal. Il comporte deux champs avec le schéma (STRUCT<type_url: STRING, value: BINARY>). Par défaut, le champ de value binaire n’est interprété d’aucune manière. Toutefois, les données binaires peuvent ne pas être pratiques pour fonctionner dans certaines applications.
      • True : La définition de cette valeur sur True permet de convertir des champs Any en chaînes JSON au moment de l’exécution. Avec cette option, le binaire est analysé et le message Protobuf est désérialisé dans une chaîne JSON.
    • Exemple : considérez deux types Protobuf définis comme suit :

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Avec cette option activée, le schéma de from_protobuf("col", messageName ="ProtoWithAny") serait : STRUCT<event_name: STRING, details: STRING>.

      Au moment de l’exécution, si details champ contient Person message Protobuf, la valeur retournée ressemble à ceci : ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Conditions requises :

      • Les définitions de tous les types Protobuf possibles utilisés dans Any champs doivent être disponibles dans le fichier descripteur Protobuf transmis à from_protobuf().
      • Si Any Protobuf est introuvable, une erreur s’affiche pour cet enregistrement.
      • Cette fonctionnalité n’est actuellement pas prise en charge par le registre des schémas.
  • emit.default.values : active le rendu des champs avec zéro valeur lors de la désérialisation de Protobuf sur un struct Spark. Cette option doit être utilisée avec parcimonie. Il est généralement déconseillé de dépendre de ces différences plus fines dans la sémantique.
    • Valeurs

      • False (valeur par défaut) : lorsqu’un champ est vide dans le Protobuf sérialisé, le champ résultant du struct Spark est par défaut null. Il est plus simple de ne pas activer cette option et de traiter null comme valeur par défaut.
      • True : lorsque cette option est activée, ces champs sont remplis avec les valeurs par défaut correspondantes.
    • Exemple : considérez le protobuf suivant avec le protobuf construit comme Person(age=0, middle_name="") :

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Avec cette option définie sur False, le struct Spark après l’appel de from_protobuf() est toutes les valeurs null : {"name": null, "age": null, "middle_name": "", "salary": null}. Même si deux champs (age et middle_name) avaient des valeurs définies, Protobuf ne les inclut pas au format filaire, car ils sont des valeurs par défaut.
      • Si cette option est réglée sur True, la structure Spark après l’appel from_protobuf() serait : {"name": "", "age": 0, "middle_name": "", "salary": null}. Le champ salary reste null, car il est explicitement déclaré optional et n’est pas défini dans l’enregistrement d’entrée.
  • enums.as.ints : lorsque cette option est activée, les champs d’énumération dans Protobuf sont rendus sous forme de champs entiers dans Spark.
    • Valeurs

      • False (valeur par défaut)
      • True : Lorsque cette option est activée, les champs de type enum dans Protobuf sont rendus comme des champs de type entier dans Spark.
    • Par exemple : considérez le Protobuf suivant :

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Étant donné un message Protobuf tel que Person(job = ENGINEER) :

      • Avec cette option désactivée, le struct Spark correspondant serait {"job": "ENGINEER"}.
      • Avec cette option activée, le struct Spark correspondant serait {"job": 1}.

      Notez que le schéma de ces champs est différent dans chaque cas (entier plutôt que chaîne par défaut). Une telle modification peut affecter le schéma des tables en aval.

Options de Registre de schémas

Les options de Registre de schéma suivantes sont pertinentes lors de l’utilisation du Registre de schémas avec les fonctions Protobuf.

  • schema.registry.subject
    • Requis
    • Spécifie l’objet du schéma dans le Registre de schémas, tel que « l’événement client »
  • schema.registry.address
    • Requis
    • URL du registre des schémas, par exemple https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Facultatif
    • Par défaut : <NONE>.
    • Une entrée de registre de schémas pour un objet peut contenir plusieurs définitions Protobuf, comme un seul fichier proto. Lorsque cette option n’est pas spécifiée, le premier Protobuf est utilisé pour le schéma. Spécifiez le nom du message Protobuf lorsqu’il n’est pas le premier dans l’entrée. Par exemple, considérez une entrée avec deux définitions Protobuf : « Personne » et « Emplacement » dans cet ordre. Si le flux correspond à « l’emplacement » plutôt qu’à « la personne », définissez cette option sur « Emplacement » (ou son nom complet, y compris le package “com.example.protos.Location”).
  • schema.registry.schema.evolution.mode
    • Par défaut : « redémarrez ».
    • Modes pris en charge :
      • « redémarrer »
      • « Aucun »
    • Cette option définit le mode d’évolution du schéma pour from_protobuf(). Au début d’une requête, Spark enregistre le dernier ID de schéma pour l’objet donné. Cela détermine le schéma de from_protobuf(). Un nouveau schéma peut être publié dans le Registre de schémas après le démarrage de la requête. Lorsqu’un ID de schéma plus récent est remarqué dans un enregistrement entrant, il indique une modification du schéma. Cette option détermine la façon dont une telle modification du schéma est gérée :
      • redémarrer (valeur par défaut) : déclenche un UnknownFieldException lorsqu’un ID de schéma plus récent est remarqué. Cette opération met fin à la requête. Databricks recommande de configurer des flux de travail pour redémarrer en cas d’échec de la requête pour récupérer les modifications de schéma.
      • aucun : les modifications de l’ID de schéma sont ignorées. Les enregistrements avec un id de schéma plus récent sont analysés avec le même schéma que celui observé au début de la requête. Les définitions Protobuf plus récentes sont censées être compatibles descendantes et de nouveaux champs sont ignorés.
  • confluent.schema.registry.<schema-registy-client-option>
    • Facultatif
    • Le registre de schémas se connecte au Registre de schémas Confluent à l’aide du client de registre de schémas confluent. Toutes les options de configuration prises en charge par le client peuvent être spécifiées avec le préfixe « confluent.schema.registry ». Par exemple, les deux paramètres suivants fournissent « USER_INFO » informations d’identification d’authentification :
      • « confluent.schema.registry.basic.auth.credentials.source » : « USER_INFO »
      • « confluent.schema.registry.basic.auth.credentials.source » : « <KEY> : <SECRET> »