Aracılığıyla paylaş


Okuma ve yazma protokolü arabellekleri

Azure Databricks, Apache Spark yapıları ve protokol arabellekleri (protobuf) arasında serileştirme ve seri durumdan çıkarma için yerel destek sağlar. Protobuf desteği bir Apache Spark DataFrame transformatörü olarak uygulanır ve Yapılandırılmış Akış ile veya toplu işlemler için kullanılabilir.

Protokol arabelleklerini seri durumdan çıkarma ve serileştirme

Databricks Runtime 12.2 LTS ve üzerinde, ve işlevlerini kullanarak from_protobufto_protobuf verileri seri hale getirip seri durumdan çıkarabilirsiniz. Protobuf serileştirmesi genellikle akış iş yüklerinde kullanılır.

Protobuf işlevlerinin temel söz dizimi, okuma ve yazma işlevleri için benzerdir. Kullanmadan önce bu işlevleri içeri aktarmanız gerekir.

from_protobuf bir ikili sütunu bir yapıya dönüştürür ve to_protobuf bir yapı sütununu ikiliye dönüştürür. Bağımsız değişkenle options belirtilen bir şema kayıt defteri veya bağımsız değişken tarafından descFilePath tanımlanan bir tanımlayıcı dosyası sağlamanız gerekir.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Aşağıdaki örneklerde, ile from_protobuf() ikili protobuf kayıtlarının işlenmesi ve Spark SQL yapısı ile ikili protobuf'a dönüştürülmesi gösterilmektedir to_protobuf().

Confluent Schema Registry ile protobuf kullanma

Azure Databricks, Protobuf tanımlamak için Confluent Schema Registry'nin kullanılmasını destekler.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Dış Confluent Schema Registry'de kimlik doğrulaması

Dış Confluent Schema Registry'de kimlik doğrulaması yapmak için şema kayıt defteri seçeneklerinizi kimlik doğrulama kimlik bilgilerini ve API anahtarlarını içerecek şekilde güncelleştirin.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity Kataloğu birimlerinde truststore ve keystore dosyalarını kullanma

Databricks Runtime 14.3 LTS ve üzerinde, Confluent Schema Registry'de kimlik doğrulaması yapmak için Unity Kataloğu birimlerindeki truststore ve keystore dosyalarını kullanabilirsiniz. Şema kayıt defteri seçeneklerinizi aşağıdaki örneğe göre güncelleştirin:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Açıklayıcı dosyayla Protobuf kullanma

İşlem kümenizde kullanılabilen bir protobuf tanımlayıcı dosyasına da başvurabilirsiniz. Konumuna bağlı olarak dosyayı okumak için uygun izinlere sahip olduğunuzdan emin olun.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf işlevlerinde desteklenen seçenekler

Protobuf işlevlerinde aşağıdaki seçenekler desteklenir.

  • modu: Protobuf kayıtlarını seri durumdan çıkarırken oluşan hataların nasıl işleneceğini belirler. Hataların nedeni, kaydın gerçek şeması ile içinde from_protobuf()sağlanan beklenen şema arasındaki uyuşmazlık da dahil olmak üzere çeşitli hatalı biçimlendirilmiş kayıt türleri olabilir.
    • Değerler:
      • FAILFAST(varsayılan): Hatalı biçimlendirilmiş bir kayıtla karşılaşıldığında ve görev başarısız olduğunda bir hata oluşur.
      • PERMISSIVE: Hatalı biçimlendirilmiş kayıtlar için NULL döndürülür. Birçok kaydın düşmesine neden olabileceğinden bu seçeneği dikkatli kullanın. Bu, kaynaktaki kayıtların küçük bir bölümü yanlış olduğunda kullanışlıdır.
  • recursive.fields.max.depth: Özyinelemeli alanlar için destek ekler. Spark SQL şemaları özyinelemeli alanları desteklemez. Bu seçenek belirtilmediğinde özyinelemeli alanlara izin verilmez. Protobufs'ta özyinelemeli alanları desteklemek için belirtilen derinliğe genişletilmeleri gerekir.
    • Değerler:

      • -1 (varsayılan): Özyinelemeli alanlara izin verilmez.

      • 0: Özyinelemeli alanlar bırakılır.

      • 1: Tek bir özyineleme düzeyine izin verir.

      • [2-10]: En fazla 10 düzey olmak üzere birden çok özyineleme için bir eşik belirtin.

        Değerin 0'dan büyük olarak ayarlanması, iç içe alanları yapılandırılan derinliğe genişleterek özyinelemeli alanlara olanak tanır. Çok büyük şemaların yanlışlıkla oluşturulmasını önlemek için 10'dan büyük değerlere izin verilmez. Protobuf iletisinde yapılandırılmış sınırın ötesinde derinlik varsa, döndürülen Spark yapısı özyineleme sınırından sonra kesilir.

    • Örnek: Aşağıdaki özyinelemeli alana sahip bir Protobuf düşünün:

      message Person { string name = 1; Person friend = 2; }
      

      Bu ayar için farklı değerlere sahip bitiş şeması aşağıda listelenmiştir:

      • Seçenek 1 olarak ayarlandı: STRUCT<name: STRING>
      • Seçenek 2 olarak ayarlanır: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Seçenek 3 olarak ayarlandı: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Bu seçenek Protobuf Any alanlarınıN JSON'a dönüştürülmesini sağlar. Bu özellik dikkatli bir şekilde etkinleştirilmelidir. JSON dönüştürme ve işleme verimsizdir. Buna ek olarak, JSON dize alanı Protobuf şema güvenliğini kaybeder ve aşağı akış işleme hatalara eğilimli olur.
    • Değerler:

      • False (varsayılan): Çalışma zamanında, bu tür joker alanlar ikili veri olarak rastgele Protobuf iletileri içerebilir. Varsayılan olarak bu tür alanlar normal bir Protobuf iletisi gibi işlenir. şemasına (STRUCT<type_url: STRING, value: BINARY>)sahip iki alanı vardır. Varsayılan olarak, ikili value alan hiçbir şekilde yorumlanmaz. Ancak ikili veriler bazı uygulamalarda çalışmak için pratikte uygun olmayabilir.
      • Doğru: Bu değerin True olarak ayarlanması, çalışma zamanında alanların JSON dizelerine dönüştürülmesini Any sağlar. Bu seçenekle ikili ayrıştırılır ve Protobuf iletisi bir JSON dizesi olarak seri durumdan çıkarılır.
    • Örnek: Aşağıdaki gibi tanımlanan iki Protobuf türünü göz önünde bulundurun:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Bu seçenek etkinleştirildiğinde şeması from_protobuf("col", messageName ="ProtoWithAny") şöyle olur: STRUCT<event_name: STRING, details: STRING>.

      Çalışma zamanında, details alan Protobuf iletisi içeriyorsa Person , döndürülen değer şöyle görünür: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Gereksinimler:

      • Alanlarda kullanılan Any tüm olası Protobuf türlerinin tanımları, sürümüne from_protobuf()geçirilen Protobuf tanımlayıcı dosyasında kullanılabilir olmalıdır.
      • Protobuf bulunamazsa Any , bu kayıt için bir hataya neden olur.
      • Bu özellik şu anda schema-registry ile desteklenmiyor.
  • emit.default.values: Protobuf bir Spark yapısında seri durumdan çıkarılırken alanları sıfır değerle işlemeyi etkinleştirir. Bu seçenek tedbirli kullanılmalıdır. Semantikteki bu kadar ince farklılıklara bağlı olmak genellikle tavsiye edilmez.
    • Değerler

      • False (varsayılan): Seri hale getirilmiş Protobuf'ta bir alan boş olduğunda Spark yapısındaki sonuçta elde edilen alan varsayılan olarak null olur. Bu seçeneği etkinleştirmemek ve varsayılan değer olarak işlemek null daha kolaydır.
      • Doğru: Bu seçenek etkinleştirildiğinde, bu tür alanlar karşılık gelen varsayılan değerlerle doldurulur.
    • Örnek: Aşağıdaki Protobuf'un aşağıdaki gibi Person(age=0, middle_name="")oluşturulmuş Protobuf'unu göz önünde bulundurun:

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Bu seçenek False olarak ayarlandığında, çağrıdan from_protobuf() sonra Spark yapısı tamamen null olur: {"name": null, "age": null, "middle_name": "", "salary": null}. İki alanın (age ve middle_name) değerleri ayarlanmış olsa da, Protobuf bunları tel biçiminde içermez çünkü bunlar varsayılan değerlerdir.
      • Bu seçenek True olarak ayarlandığında, çağrıdan from_protobuf() sonra Spark yapısı şöyle olur: {"name": "", "age": 0, "middle_name": "", "salary": null}. Alan salary , açıkça bildirildiğinden optional ve giriş kaydında ayarlanmadığından null olarak kalır.
  • enums.as.ints: Etkinleştirildiğinde, Protobuf'taki sabit listesi alanları Spark'ta tamsayı alanları olarak işlenir.
    • Değerler

      • False (varsayılan)
      • Doğru: Etkinleştirildiğinde, Protobuf'taki sabit listesi alanları Spark'ta tamsayı alanları olarak işlenir.
    • Örnek: Aşağıdaki Protobuf'a göz önünde bulundurun:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Gibi Person(job = ENGINEER)bir Protobuf iletisi verilir:

      • Bu seçenek devre dışı bırakılmıştır, buna karşılık gelen Spark yapısı olacaktır {"job": "ENGINEER"}.
      • Bu seçenek etkinleştirildiğinde ilgili Spark yapısı olacaktır {"job": 1}.

      Bu alanların şemasının her durumda farklı olduğuna dikkat edin (varsayılan dize yerine tamsayı). Böyle bir değişiklik aşağı akış tablolarının şemasını etkileyebilir.

Şema Kayıt Defteri Seçenekleri

Aşağıdaki şema kayıt defteri seçenekleri, Protobuf işlevleriyle şema kayıt defteri kullanılırken geçerlidir.

  • schema.registry.subject
    • Zorunlu
    • Schema Registry'de şemanın konusunu belirtir, örneğin "client-event"
  • schema.registry.address
    • Zorunlu
    • Şema kayıt defteri URL'si, örneğin https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • İsteğe bağlı
    • Varsayılan: <NONE>.
    • Bir konu için şema kayıt defteri girdisi, aynı tek proto bir dosya gibi birden çok Protobuf tanımı içerebilir. Bu seçenek belirtilmediğinde, şema için ilk Protobuf kullanılır. Girişteki ilk ileti olmadığında Protobuf iletisinin adını belirtin. Örneğin, iki Protobuf tanımına sahip bir giriş düşünün: bu sırada "Kişi" ve "Konum". Akış "Kişi" yerine "Konum"a karşılık geliyorsa, bu seçeneği "Konum" (veya "com.example.protos.Location" paketi dahil tam adı) olarak ayarlayın.
  • schema.registry.schema.evolution.mode
    • Varsayılan: "yeniden başlat".
    • Desteklenen modlar:
      • "yeniden başlat"
      • "hiçbiri"
    • Bu seçenek için from_protobuf()şema evrim modunu ayarlar. Sorgunun başlangıcında Spark, verilen konu için en son şema kimliğini kaydeder. Bu, için from_protobuf()şemayı belirler. Sorgu başlatıldıktan sonra şema kayıt defterinde yeni bir şema yayımlanabilir. Gelen kayıtta daha yeni bir şema kimliği fark edildiğinde şemada bir değişiklik olduğunu gösterir. Bu seçenek, şemada böyle bir değişikliğin nasıl işleneceğini belirler:
      • yeniden başlatma (varsayılan): Daha yeni bir schema-id fark edildiğinde bir UnknownFieldException tetikler. Bu işlem sorguyu sonlandırır. Databricks, şema değişikliklerini almak için sorgu hatasında iş akışlarını yeniden başlatacak şekilde yapılandırmanızı önerir.
      • none: Şema kimliği değişiklikleri yoksayılır. Daha yeni schema-id'ye sahip kayıtlar, sorgunun başında gözlemlenen şemayla ayrıştırılır. Daha yeni Protobuf tanımlarının geriye dönük uyumlu olması beklenir ve yeni alanlar yoksayılır.
  • confluent.schema.registry.<schema-registy-client-option>
    • İsteğe bağlı
    • Schema-registry, Confluent Schema Registry istemcisini kullanarak Confluent schema-registry'ye bağlanır. İstemci tarafından desteklenen tüm yapılandırma seçenekleri "confluent.schema.registry" ön eki ile belirtilebilir. Örneğin, aşağıdaki iki ayar "USER_INFO" kimlik doğrulaması kimlik bilgilerini sağlar:
      • "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"