프로토콜 버퍼 읽기 및 쓰기

Azure Databricks는 Apache Spark 구조체와 프로토콜 버퍼(protobuf) 간의 직렬화 및 역직렬화에 대한 기본 지원을 제공합니다. Protobuf 지원은 Apache Spark DataFrame 변환기로 구현되며 구조적 스트리밍 또는 일괄 처리 작업에 사용할 수 있습니다.

프로토콜 버퍼를 역직렬화 및 직렬화하는 방법

Databricks Runtime 12.2 LTS 이상에서는 데이터를 직렬화하고 to_protobuf 역직렬화하는 데 사용하고 함수를 사용할 from_protobuf 수 있습니다. Protobuf serialization은 스트리밍 워크로드에서 일반적으로 사용됩니다.

protobuf 함수의 기본 구문은 읽기 및 쓰기 함수와 비슷합니다. 사용하려면 먼저 이러한 함수를 가져와야 합니다.

from_protobuf 는 이진 열을 구조체로 캐스팅하고 to_protobuf 구조체 열을 이진으로 캐스팅합니다. 인수로 지정된 스키마 레지스트리 또는 인수로 식별된 optionsdescFilePath 설명자 파일을 제공해야 합니다.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

다음 예제에서는 이진 protobuf 레코드를 from_protobuf() 처리하고 Spark SQL 구조체를 이진 protobuf로 to_protobuf()변환하는 방법을 보여 줍니다.

Confluent 스키마 레지스트리에서 protobuf 사용

Azure Databricks는 Confluent 스키마 레지스트리를 사용하여 Protobuf를 정의하도록 지원합니다.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

외부 Confluent 스키마 레지스트리에 인증

외부 Confluent 스키마 레지스트리에 인증하려면 인증 자격 증명 및 API 키를 포함하도록 스키마 레지스트리 옵션을 업데이트합니다.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity 카탈로그 볼륨에서 truststore 및 키 저장소 파일 사용

Databricks Runtime 14.3 LTS 이상에서는 Unity 카탈로그 볼륨의 truststore 및 키 저장소 파일을 사용하여 Confluent 스키마 레지스트리에 인증할 수 있습니다. 다음 예제에 따라 스키마 레지스트리 옵션을 업데이트합니다.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

설명자 파일과 함께 Protobuf 사용

컴퓨팅 클러스터에서 사용할 수 있는 protobuf 설명자 파일을 참조할 수도 있습니다. 해당 위치에 따라 파일을 읽을 수 있는 적절한 권한이 있는지 확인합니다.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 함수에서 지원되는 옵션

Protobuf 함수에서 지원되는 옵션은 다음과 같습니다.

  • 모드: Protobuf 레코드를 역직렬화하는 동안 오류가 처리되는 방법을 결정합니다. 오류는 레코드의 실제 스키마와 제공된 예상 스키마 간의 불일치를 포함하여 다양한 형식의 잘못된 형식의 레코드로 인해 발생할 수 있습니다 from_protobuf().
    • :
      • FAILFAST(기본값): 잘못된 형식의 레코드가 발생하고 작업이 실패하면 오류가 발생합니다.
      • PERMISSIVE: 형식이 잘못된 레코드에 대해 NULL이 반환됩니다. 많은 레코드가 삭제될 수 있으므로 이 옵션을 신중하게 사용합니다. 이는 원본에 있는 레코드의 작은 부분이 잘못된 경우에 유용합니다.
  • recursive.fields.max.depth: 재귀 필드에 대한 지원을 추가합니다. Spark SQL 스키마는 재귀 필드를 지원하지 않습니다. 이 옵션을 지정하지 않으면 재귀 필드가 허용되지 않습니다. Protobufs에서 재귀 필드를 지원하려면 지정된 깊이로 확장해야 합니다.
    • 값:

      • -1(기본값): 재귀 필드는 허용되지 않습니다.

      • 0: 재귀 필드가 삭제됩니다.

      • 1: 단일 수준의 재귀를 허용합니다.

      • [2-10]: 최대 10개 수준까지 여러 재귀에 대한 임계값을 지정합니다.

        값을 0보다 크게 설정하면 중첩된 필드를 구성된 깊이로 확장하여 재귀 필드를 허용합니다. 실수로 매우 큰 스키마를 만들지 않도록 10보다 큰 값은 허용되지 않습니다. Protobuf 메시지에 구성된 제한을 초과하는 깊이가 있는 경우 반환된 Spark 구조체는 재귀 제한 후에 잘립니다.

    • : 다음 재귀 필드가 있는 Protobuf를 고려합니다.

      message Person { string name = 1; Person friend = 2; }
      

      다음은 이 설정에 대해 서로 다른 값을 가진 끝 스키마를 나열합니다.

      • 1로 설정된 옵션: STRUCT<name: STRING>
      • 옵션 2로 설정: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • 3으로 설정된 옵션: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: 이 옵션을 사용하면 Protobuf Any 필드를 JSON으로 변환할 수 있습니다. 이 기능은 신중하게 사용하도록 설정해야 합니다. JSON 변환 및 처리는 비효율적입니다. 또한 JSON 문자열 필드는 Protobuf 스키마 안전성 때문에 다운스트림 처리에서 오류가 발생하기 쉽습니다.
    • 값:

      • False(기본값): 런타임에 이러한 야생카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다. 스키마 (STRUCT<type_url: STRING, value: BINARY>)가 있는 두 개의 필드가 있습니다. 기본적으로 이진 value 필드는 어떤 방식으로도 해석되지 않습니다. 그러나 이진 데이터는 실제로 일부 애플리케이션에서 작동하는 것이 편리하지 않을 수 있습니다.
      • True: 이 값을 True로 설정하면 런타임에 필드를 JSON 문자열로 변환 Any 할 수 있습니다. 이 옵션을 사용하면 이진 파일이 구문 분석되고 Protobuf 메시지가 JSON 문자열로 역직렬화됩니다.
    • : 다음과 같이 정의된 두 가지 Protobuf 형식을 고려합니다.

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      이 옵션을 사용하도록 설정하면 스키마 from_protobuf("col", messageName ="ProtoWithAny") 는 다음과 STRUCT<event_name: STRING, details: STRING>같습니다.

      런타임에 필드에 Protobuf 메시지가 포함된 Person 경우 details 반환된 값은 다음과 ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')같습니다.

    • 요구 사항:

      • 필드에 사용되는 가능한 모든 Protobuf 형식에 Any 대한 정의는 전달된 from_protobuf()Protobuf 설명자 파일에서 사용할 수 있어야 합니다.
      • Protobuf를 찾을 수 없으면 Any 해당 레코드에 대한 오류가 발생합니다.
      • 이 기능은 현재 스키마 레지스트리에서 지원되지 않습니다.
  • emit.default.values: Protobuf를 Spark 구조체로 역직렬화할 때 값이 0인 렌더링 필드를 사용하도록 설정합니다. 이 옵션은 아쉽게 사용해야 합니다. 일반적으로 의미 체계에서 이러한 미세한 차이에 의존 하는 것이 좋습니다.
      • False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는 null 것이 더 간단합니다.
      • True: 이 옵션을 사용하도록 설정하면 해당 필드가 해당 기본값으로 채워집니다.
    • : 다음과 같이 Person(age=0, middle_name="")생성된 Protobuf를 사용하여 다음 Protobuf를 고려합니다.

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • 이 옵션을 False로 설정하면 호출 from_protobuf() 후 Spark 구조체가 모두 null이 {"name": null, "age": null, "middle_name": "", "salary": null}됩니다. 두 필드(agemiddle_name)에 값이 설정되어 있더라도 Protobuf는 기본값이므로 와이어 형식으로 포함하지 않습니다.
      • 이 옵션을 True로 설정하면 호출 from_protobuf() 후 Spark 구조체는 다음과 {"name": "", "age": 0, "middle_name": "", "salary": null}같습니다. 필드가 salary 명시적으로 선언되고 optional 입력 레코드에 설정되지 않았으므로 필드가 null을 다시 기본.
  • enums.as.ints: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
      • False(기본값)
      • True: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
    • 예제: 다음 Protobuf를 고려합니다.

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      다음과 같은 Person(job = ENGINEER)Protobuf 메시지가 표시됩니다.

      • 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다 {"job": "ENGINEER"}.
      • 이 옵션을 사용하도록 설정하면 해당 Spark 구조체가 됩니다 {"job": 1}.

      이러한 필드의 스키마는 각 사례에서 다릅니다(기본 문자열이 아닌 정수). 이러한 변경은 다운스트림 테이블의 스키마에 영향을 줄 수 있습니다.

스키마 레지스트리 옵션

다음 스키마 레지스트리 옵션은 Protobuf 함수와 함께 스키마 레지스트리를 사용하는 동안 관련이 있습니다.

  • schema.registry.subject
    • Required
    • 스키마 레지스트리의 스키마 제목(예: "client-event")을 지정합니다.
  • schema.registry.address
    • Required
    • 스키마 레지스트리의 URL(예: ) https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • 선택 사항
    • 기본값: <NONE>.
    • 주체에 대한 스키마 레지스트리 항목에는 단일 proto 파일처럼 여러 Protobuf 정의가 포함될 수 있습니다. 이 옵션을 지정하지 않으면 스키마에 첫 번째 Protobuf가 사용됩니다. 항목의 첫 번째 메시지가 아닌 경우 Protobuf 메시지의 이름을 지정합니다. 예를 들어 두 개의 Protobuf 정의인 "Person" 및 "Location"이 있는 항목을 순서대로 고려해 보세요. 스트림이 "Person"이 아닌 "Location"에 해당하는 경우 이 옵션을 "위치"(또는 패키지 "com.example.protos.Location"를 포함한 전체 이름)로 설정합니다.
  • schema.registry.schema.evolution.mode
    • 기본값: "restart".
    • 지원되는 모드:
      • "restart"
      • "none"
    • 이 옵션은 스키마 진화 모드를 from_protobuf()설정합니다. 쿼리 시작 시 Spark는 지정된 주제에 대한 최신 스키마 ID를 기록합니다. 그러면 에 대한 from_protobuf()스키마가 결정됩니다. 쿼리가 시작된 후 새 스키마가 스키마 레지스트리에 게시될 수 있습니다. 들어오는 레코드에서 최신 스키마 ID가 발견되면 스키마가 변경되었음을 나타냅니다. 이 옵션은 스키마에 대한 이러한 변경을 처리하는 방법을 결정합니다.
      • restart (기본값): 최신 스키마 ID가 발견되면 트리거 UnknownFieldException 됩니다. 그러면 쿼리가 종료됩니다. Databricks는 스키마 변경 내용을 선택하기 위해 쿼리 실패 시 다시 시작하도록 워크플로를 구성하는 것이 좋습니다.
      • none: 스키마 ID 변경 내용은 무시됩니다. 최신 스키마 ID가 있는 레코드는 쿼리 시작 시 관찰된 것과 동일한 스키마로 구문 분석됩니다. 최신 Protobuf 정의는 이전 버전과 호환될 것으로 예상되며 새 필드는 무시됩니다.
  • confluent.schema.registry.<schema-registy-client-option>
    • 선택 사항
    • 스키마 레지스트리는 Confluent 스키마 레지스트리 클라이언트사용하여 Confluent 스키마 레지스트리에 연결합니다. 클라이언트에서 지원하는 모든 구성 옵션은 "confluent.schema.registry" 접두사를 사용하여 지정할 수 있습니다. 예를 들어 다음 두 설정은 "USER_INFO" 인증 자격 증명을 제공합니다.
      • "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"