قراءة وكتابة المخازن المؤقتة للبروتوكول

يوفر Azure Databricks الدعم الأصلي للتسلسل وإلغاء التسلسل بين بنيات Apache Spark والمخازن المؤقتة للبروتوكول (protobuf). يتم تنفيذ دعم Protobuf كمحول Apache Spark DataFrame ويمكن استخدامه مع Structured Streaming أو لعمليات الدفعات.

كيفية إلغاء تسلسل المخازن المؤقتة للبروتوكول وتسلسلها

في Databricks Runtime 12.2 LTS وما فوق، يمكنك استخدام from_protobuf الدالات و to_protobuf لتسلسل البيانات وإلغاء تسلسلها. يتم استخدام تسلسل Protobuf بشكل شائع في أحمال العمل المتدفقة.

بناء الجملة الأساسي لدالات protobuf مشابه لدالات القراءة والكتابة. يجب استيراد هذه الدالات قبل الاستخدام.

from_protobuf تحويل عمود ثنائي إلى بنية، ثم to_protobuf تحويل عمود بنية إلى عمود ثنائي. يجب توفير إما سجل مخطط محدد مع الوسيطة options أو ملف واصف تم تعريفه بواسطة الوسيطة descFilePath .

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

توضح الأمثلة التالية معالجة سجلات protobuf الثنائية مع from_protobuf() وتحويل بنية Spark SQL إلى protobuf ثنائي باستخدام to_protobuf().

استخدام protobuf مع Confluent Schema Registry

يدعم Azure Databricks استخدام سجل مخطط Confluent لتعريف Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

المصادقة على سجل مخطط Confluent خارجي

للمصادقة على سجل مخطط Confluent خارجي، قم بتحديث خيارات سجل المخطط لتضمين بيانات اعتماد المصادقة ومفاتيح واجهة برمجة التطبيقات.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

استخدام ملفات مخزن الثقة ومخزن المفاتيح في وحدات تخزين كتالوج Unity

في Databricks Runtime 14.3 LTS وما فوق، يمكنك استخدام ملفات مخزن الثقة ومخزن المفاتيح في وحدات تخزين كتالوج Unity للمصادقة على سجل مخطط Confluent. قم بتحديث خيارات سجل المخطط وفقا للمثال التالي:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

استخدام Protobuf مع ملف واصف

يمكنك أيضا الرجوع إلى ملف واصف protobuf متوفر لمجموعة الحوسبة الخاصة بك. تأكد من أن لديك الأذونات المناسبة لقراءة الملف، استنادا إلى موقعه.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

الخيارات المدعومة في وظائف Protobuf

الخيارات التالية مدعومة في وظائف Protobuf.

  • الوضع: يحدد كيفية معالجة الأخطاء أثناء إلغاء تسلسل سجلات Protobuf. قد تكون الأخطاء ناتجة عن أنواع مختلفة من السجلات التي تم تكوينها بشكل غير صحيح بما في ذلك عدم التطابق بين المخطط الفعلي للسجل والمخطط المتوقع المتوفر في from_protobuf().
    • القيم:
      • FAILFAST(افتراضي): يتم طرح خطأ عند مواجهة سجل مشوه وفشل المهمة.
      • PERMISSIVE: يتم إرجاع NULL للسجلات التي تم تكوينها بشكل غير صحيح. استخدم هذا الخيار بعناية لأنه يمكن أن يؤدي إلى إسقاط العديد من السجلات. يكون هذا مفيدا عندما يكون جزء صغير من السجلات في المصدر غير صحيح.
  • recursive.fields.max.depth: يضيف دعما للحول المتكررة. لا تدعم مخططات Spark SQL الحقول المتكررة. عندما لا يتم تحديد هذا الخيار، لا يسمح بالحقول المتكررة. من أجل دعم الحقول المتكررة في Protobufs، يجب توسيعها إلى عمق محدد.
    • القيم:

      • -1 (افتراضي): الحقول المتكررة غير مسموح بها.

      • 0: يتم إسقاط الحقول المتكررة.

      • 1: يسمح بمستوى واحد من الإعادة.

      • [2-10]: حدد حدا للإعادة المتعددة، حتى 10 مستويات.

        يسمح تعيين قيمة إلى أكبر من 0 بالحقول المتكررة عن طريق توسيع الحقول المتداخلة إلى العمق المكون. لا يسمح بالقيم الأكبر من 10 لتجنب إنشاء مخططات كبيرة جدا عن غير قصد. إذا كانت رسالة Protobuf تحتوي على عمق يتجاوز الحد المكون، يتم اقتطاع بنية Spark التي تم إرجاعها بعد حد الإعادة.

    • مثال: ضع في اعتبارك Protobuf مع الحقل المتكرر التالي:

      message Person { string name = 1; Person friend = 2; }
      

      يسرد التالي مخطط النهاية بقيم مختلفة لهذا الإعداد:

      • تم تعيين الخيار إلى 1: STRUCT<name: STRING>
      • تم تعيين الخيار إلى 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • تم تعيين الخيار إلى 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: يتيح هذا الخيار تحويل حقول Protobuf Any إلى JSON. يجب تمكين هذه الميزة بعناية. تحويل JSON ومعالجته غير فعال. بالإضافة إلى ذلك، يفقد حقل سلسلة JSON أمان مخطط Protobuf مما يجعل المعالجة النهائية عرضة للأخطاء.
    • القيم:

      • خطأ (افتراضي): في وقت التشغيل، يمكن أن تحتوي حقول أحرف البدل هذه على رسائل Protobuf عشوائية كبيانات ثنائية. بشكل افتراضي، يتم التعامل مع مثل هذه الحقول كرسالة Protobuf عادية. يحتوي على حقلين مع مخطط (STRUCT<type_url: STRING, value: BINARY>). بشكل افتراضي، لا يتم تفسير الحقل الثنائي value بأي شكل من الأشكال. ولكن قد لا تكون البيانات الثنائية ملائمة في الممارسة العملية للعمل في بعض التطبيقات.
      • True: يؤدي تعيين هذه القيمة إلى True إلى تمكين تحويل Any الحقول إلى سلاسل JSON في وقت التشغيل. باستخدام هذا الخيار، يتم تحليل الثنائي ويتم إلغاء تسلسل رسالة Protobuf في سلسلة JSON.
    • مثال: ضع في اعتبارك نوعين من Protobuf معرفين على النحو التالي:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      مع تمكين هذا الخيار، سيكون مخطط ل from_protobuf("col", messageName ="ProtoWithAny") : STRUCT<event_name: STRING, details: STRING>.

      في وقت التشغيل، إذا كان details الحقل يحتوي على Person رسالة Protobuf، فإن القيمة التي تم إرجاعها تبدو كما يلي: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • المتطلبات:

      • يجب أن تتوفر تعريفات جميع أنواع Protobuf المحتملة المستخدمة في Any الحقول في ملف واصف Protobuf الذي تم تمريره إلى from_protobuf().
      • إذا Any لم يتم العثور على Protobuf، فسينتج عن ذلك خطأ لهذا السجل.
      • هذه الميزة غير مدعومة حاليا مع سجل المخطط.
  • emit.default.values: تمكين حقول العرض ذات القيم الصفرية عند إلغاء تسلسل Protobuf إلى بنية Spark. يجب استخدام هذا الخيار باعتدال. عادة لا ينصح بالاعتماد على مثل هذه الاختلافات الدقيقة في الدلالات.
    • القيم

      • خطأ (افتراضي): عندما يكون الحقل فارغا في Protobuf المتسلسل، يكون الحقل الناتج في بنية Spark فارغا بشكل افتراضي. من الأسهل عدم تمكين هذا الخيار والتعامل معه null كقيمة افتراضية.
      • صواب: عند تمكين هذا الخيار، يتم تعبئة هذه الحقول بالقيم الافتراضية المقابلة.
    • مثال: ضع في اعتبارك Protobuf التالي مع Protobuf الذي تم إنشاؤه مثل Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • مع تعيين هذا الخيار إلى False، ستكون بنية Spark بعد الاستدعاء from_protobuf() كلها فارغة: {"name": null, "age": null, "middle_name": "", "salary": null}. على الرغم من تعيين قيم لحقلين (age و middle_name) ، لا يتضمن Protobufهما بتنسيق سلكي نظرا لأنهما قيم افتراضية.
      • مع تعيين هذا الخيار إلى True، ستكون بنية Spark بعد الاستدعاء from_protobuf() : {"name": "", "age": 0, "middle_name": "", "salary": null}. salary يظل الحقل فارغا لأنه تم الإعلان optional عنه بشكل صريح ولم يتم تعيينه في سجل الإدخال.
  • enums.as.ints: عند التمكين، يتم عرض حقول التعداد في Protobuf كحولول عدد صحيح في Spark.
    • القيم

      • خطأ (افتراضي)
      • صواب: عند التمكين، يتم عرض حقول التعداد في Protobuf كحولول عدد صحيح في Spark.
    • مثال: ضع في اعتبارك Protobuf التالي:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      نظرا لرسالة Protobuf مثل Person(job = ENGINEER):

      • مع تعطيل هذا الخيار، ستكون {"job": "ENGINEER"}بنية Spark المقابلة .
      • مع تمكين هذا الخيار، ستكون {"job": 1}بنية Spark المقابلة .

      لاحظ أن مخطط هذه الحقول مختلف في كل حالة (عدد صحيح بدلا من سلسلة افتراضية). يمكن أن يؤثر هذا التغيير على مخطط جداول انتقال البيانات من الخادم.

خيارات سجل المخطط

خيارات سجل المخطط التالية ذات صلة أثناء استخدام سجل المخطط مع وظائف Protobuf.

  • schema.registry.subject
    • المطلوب
    • تحديد موضوع للمخطط في سجل المخطط، مثل "client-event"
  • schema.registry.address
    • المطلوب
    • عنوان URL لسجل المخطط، مثل https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • اختياري
    • افتراضي: <NONE>.
    • يمكن أن يحتوي إدخال سجل المخطط لموضوع ما على تعريفات Protobuf متعددة، تماما مثل ملف واحد proto . عندما لا يتم تحديد هذا الخيار، يتم استخدام Protobuf الأول للمخطط. حدد اسم رسالة Protobuf عندما لا تكون الأولى في الإدخال. على سبيل المثال، ضع في اعتبارك إدخالا بتعريفين Protobuf: "Person" و"Location" بهذا الترتيب. إذا كان الدفق يتوافق مع "الموقع" بدلا من "شخص"، فقم بتعيين هذا الخيار إلى "الموقع" (أو اسمه الكامل بما في ذلك الحزمة "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • الافتراضي: "إعادة التشغيل".
    • الأوضاع المدعومة:
      • "إعادة التشغيل"
      • "بلا"
    • يعين هذا الخيار وضع تطور المخطط ل from_protobuf(). في بداية استعلام، يسجل Spark أحدث معرف مخطط للموضوع المحدد. يحدد هذا المخطط ل from_protobuf(). قد يتم نشر مخطط جديد إلى سجل المخطط بعد بدء الاستعلام. عند ملاحظة معرف مخطط أحدث في سجل وارد، فإنه يشير إلى تغيير في المخطط. يحدد هذا الخيار كيفية معالجة مثل هذا التغيير في المخطط:
      • إعادة التشغيل (افتراضي): يؤدي إلى UnknownFieldException تشغيل عند ملاحظة معرف مخطط أحدث. يؤدي ذلك إلى إنهاء الاستعلام. توصي Databricks بتكوين المهام لإعادة التشغيل عند فشل الاستعلام في التقاط تغييرات المخطط.
      • بلا: يتم تجاهل تغييرات معرف المخطط. يتم تحليل السجلات ذات معرف المخطط الأحدث بنفس المخطط الذي تمت ملاحظته في بداية الاستعلام. من المتوقع أن تكون تعريفات Protobuf الأحدث متوافقة مع الإصدارات السابقة، ويتم تجاهل الحقول الجديدة.
  • confluent.schema.registry.<schema-registy-client-option>
    • اختياري
    • يتصل سجل المخطط ب Confluent schema-registry باستخدام عميل Confluent Schema Registry. يمكن تحديد أي خيارات تكوين يدعمها العميل بالبادئة "confluent.schema.registry". على سبيل المثال، يوفر الإعدادان التاليان بيانات اعتماد المصادقة "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"