Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Protocol Buffers (protobuf) is a language-neutral binary serialization format developed by Google. Azure Databricks users most commonly encounter it when processing binary-encoded records from event streaming systems such as Apache Kafka. Azure Databricks supports reading and writing protobuf data with Apache Spark through the from_protobuf and to_protobuf functions, which convert between binary protobuf and Spark SQL struct types for both streaming and batch workloads.
Prerequisites
Protobuf functions require Databricks Runtime 12.2 LTS and above.
Function syntax
Use from_protobuf to cast a binary column to a struct, and to_protobuf to cast a struct column to binary. You must provide either a descriptor file identified by the descFilePath argument or a schema registry specified with the options argument. For a complete list of options, see Protobuf.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Options
Pass options to from_protobuf and to_protobuf using the options argument. For a complete list of supported options, see Protobuf.
Schema Registry options
The following options are specific to schema registry usage and are not covered in the general options reference.
| Option | Required | Default | Description |
|---|---|---|---|
schema.registry.schema.evolution.mode |
No | "restart" |
How schema changes are handled when a newer schema-id is detected in an incoming record. "restart" terminates the query with an UnknownFieldException; configure jobs to restart on failure to pick up changes. "none" ignores schema-id changes and parses newer records with the original schema. |
confluent.schema.registry.<option> |
No | — | Pass any Confluent Schema Registry client option using the prefix "confluent.schema.registry". For example, set "confluent.schema.registry.basic.auth.credentials.source" to "USER_INFO" and "confluent.schema.registry.basic.auth.user.info" to "<KEY>:<SECRET>" to configure basic auth. |
Usage
The following examples use the Wanderbricks dataset to demonstrate serializing Apache Spark structs to binary protobuf with to_protobuf() and deserializing binary protobuf records with from_protobuf().
Use protobuf with Confluent Schema Registry
Azure Databricks supports using the Confluent Schema Registry to define Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
reviewsRestoredDF.show()
Authenticate to an external Confluent Schema Registry
To authenticate to an external Confluent Schema Registry, update your schema registry options to include auth credentials and API keys.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Use truststore and keystore files in Unity Catalog volumes
In Databricks Runtime 14.3 LTS and above, you can use truststore and keystore files in Unity Catalog volumes to authenticate to a Confluent Schema Registry. Update your schema registry options according to the following example:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Use Protobuf with a descriptor file
You can also reference a protobuf descriptor file that is available to your compute cluster. Make sure you have proper permissions to read the file, depending on its location.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
descriptor_file = "/path/to/proto_descriptor.desc"
# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
val descriptorFile = "/path/to/proto_descriptor.desc"
// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()
Additional resources
- Read and write streaming Avro data: If your streaming workload uses Avro serialization rather than Protobuf, see the Avro streaming functions for the equivalent
from_avroandto_avrofunctions.