프로토콜 버퍼 읽기 및 쓰기
Azure Databricks는 Apache Spark 구조체와 프로토콜 버퍼(protobuf) 간의 직렬화 및 역직렬화에 대한 기본 지원을 제공합니다. Protobuf 지원은 Apache Spark DataFrame 변환기로 구현되며 구조적 스트리밍 또는 일괄 처리 작업에 사용할 수 있습니다.
프로토콜 버퍼를 역직렬화 및 직렬화하는 방법
Databricks Runtime 12.2 LTS 이상에서는 데이터를 직렬화하고 to_protobuf
역직렬화하는 데 사용하고 함수를 사용할 from_protobuf
수 있습니다. Protobuf serialization은 스트리밍 워크로드에서 일반적으로 사용됩니다.
protobuf 함수의 기본 구문은 읽기 및 쓰기 함수와 비슷합니다. 사용하려면 먼저 이러한 함수를 가져와야 합니다.
from_protobuf
는 이진 열을 구조체로 캐스팅하고 to_protobuf
구조체 열을 이진으로 캐스팅합니다. 인수로 지정된 스키마 레지스트리 또는 인수로 식별된 options
descFilePath
설명자 파일을 제공해야 합니다.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
다음 예제에서는 이진 protobuf 레코드를 from_protobuf()
처리하고 Spark SQL 구조체를 이진 protobuf로 to_protobuf()
변환하는 방법을 보여 줍니다.
Confluent 스키마 레지스트리에서 protobuf 사용
Azure Databricks는 Confluent 스키마 레지스트리를 사용하여 Protobuf를 정의하도록 지원합니다.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
외부 Confluent 스키마 레지스트리에 인증
외부 Confluent 스키마 레지스트리에 인증하려면 인증 자격 증명 및 API 키를 포함하도록 스키마 레지스트리 옵션을 업데이트합니다.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Unity 카탈로그 볼륨에서 truststore 및 키 저장소 파일 사용
Databricks Runtime 14.3 LTS 이상에서는 Unity 카탈로그 볼륨의 truststore 및 키 저장소 파일을 사용하여 Confluent 스키마 레지스트리에 인증할 수 있습니다. 다음 예제에 따라 스키마 레지스트리 옵션을 업데이트합니다.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
설명자 파일과 함께 Protobuf 사용
컴퓨팅 클러스터에서 사용할 수 있는 protobuf 설명자 파일을 참조할 수도 있습니다. 해당 위치에 따라 파일을 읽을 수 있는 적절한 권한이 있는지 확인합니다.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Protobuf 함수에서 지원되는 옵션
Protobuf 함수에서 지원되는 옵션은 다음과 같습니다.
- 모드: Protobuf 레코드를 역직렬화하는 동안 오류가 처리되는 방법을 결정합니다. 오류는 레코드의 실제 스키마와 제공된 예상 스키마 간의 불일치를 포함하여 다양한 형식의 잘못된 형식의 레코드로 인해 발생할 수 있습니다
from_protobuf()
.- 값:
FAILFAST
(기본값): 잘못된 형식의 레코드가 발생하고 작업이 실패하면 오류가 발생합니다.PERMISSIVE
: 형식이 잘못된 레코드에 대해 NULL이 반환됩니다. 많은 레코드가 삭제될 수 있으므로 이 옵션을 신중하게 사용합니다. 이는 원본에 있는 레코드의 작은 부분이 잘못된 경우에 유용합니다.
- 값:
- recursive.fields.max.depth: 재귀 필드에 대한 지원을 추가합니다. Spark SQL 스키마는 재귀 필드를 지원하지 않습니다. 이 옵션을 지정하지 않으면 재귀 필드가 허용되지 않습니다. Protobufs에서 재귀 필드를 지원하려면 지정된 깊이로 확장해야 합니다.
값:
-1(기본값): 재귀 필드는 허용되지 않습니다.
0: 재귀 필드가 삭제됩니다.
1: 단일 수준의 재귀를 허용합니다.
[2-10]: 최대 10개 수준까지 여러 재귀에 대한 임계값을 지정합니다.
값을 0보다 크게 설정하면 중첩된 필드를 구성된 깊이로 확장하여 재귀 필드를 허용합니다. 실수로 매우 큰 스키마를 만들지 않도록 10보다 큰 값은 허용되지 않습니다. Protobuf 메시지에 구성된 제한을 초과하는 깊이가 있는 경우 반환된 Spark 구조체는 재귀 제한 후에 잘립니다.
예: 다음 재귀 필드가 있는 Protobuf를 고려합니다.
message Person { string name = 1; Person friend = 2; }
다음은 이 설정에 대해 서로 다른 값을 가진 끝 스키마를 나열합니다.
- 1로 설정된 옵션:
STRUCT<name: STRING>
- 옵션 2로 설정:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- 3으로 설정된 옵션:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- 1로 설정된 옵션:
- convert.any.fields.to.json: 이 옵션을 사용하면 Protobuf Any 필드를 JSON으로 변환할 수 있습니다. 이 기능은 신중하게 사용하도록 설정해야 합니다. JSON 변환 및 처리는 비효율적입니다. 또한 JSON 문자열 필드는 Protobuf 스키마 안전성 때문에 다운스트림 처리에서 오류가 발생하기 쉽습니다.
값:
- False(기본값): 런타임에 이러한 와일드카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다. 스키마
(STRUCT<type_url: STRING, value: BINARY>)
가 있는 두 개의 필드가 있습니다. 기본적으로 이진value
필드는 어떤 방식으로도 해석되지 않습니다. 그러나 이진 데이터는 실제로 일부 애플리케이션에서 작동하는 것이 편리하지 않을 수 있습니다. - True: 이 값을 True로 설정하면 런타임에 필드를 JSON 문자열로 변환
Any
할 수 있습니다. 이 옵션을 사용하면 이진 파일이 구문 분석되고 Protobuf 메시지가 JSON 문자열로 역직렬화됩니다.
- False(기본값): 런타임에 이러한 와일드카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다. 스키마
예: 다음과 같이 정의된 두 가지 Protobuf 형식을 고려합니다.
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
이 옵션을 사용하도록 설정하면 스키마
from_protobuf("col", messageName ="ProtoWithAny")
는 다음과STRUCT<event_name: STRING, details: STRING>
같습니다.런타임에 필드에 Protobuf 메시지가 포함된
Person
경우details
반환된 값은 다음과('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
같습니다.요구 사항:
- 필드에 사용되는 가능한 모든 Protobuf 형식에
Any
대한 정의는 전달된from_protobuf()
Protobuf 설명자 파일에서 사용할 수 있어야 합니다. - Protobuf를 찾을 수 없으면
Any
해당 레코드에 대한 오류가 발생합니다. - 이 기능은 현재 스키마 레지스트리에서 지원되지 않습니다.
- 필드에 사용되는 가능한 모든 Protobuf 형식에
- emit.default.values: Protobuf를 Spark 구조체로 역직렬화할 때 값이 0인 렌더링 필드를 사용하도록 설정합니다. 이 옵션은 아쉽게 사용해야 합니다. 일반적으로 의미 체계에서 이러한 미세한 차이에 의존 하는 것이 좋습니다.
값
- False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는
null
것이 더 간단합니다. - True: 이 옵션을 사용하도록 설정하면 해당 필드가 해당 기본값으로 채워집니다.
- False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는
예: 다음과 같이
Person(age=0, middle_name="")
생성된 Protobuf를 사용하여 다음 Protobuf를 고려합니다.syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- 이 옵션을 False로 설정하면 호출
from_protobuf()
후 Spark 구조체가 모두 null이{"name": null, "age": null, "middle_name": "", "salary": null}
됩니다. 두 필드(age
및middle_name
)에 값이 설정되어 있더라도 Protobuf는 기본값이므로 와이어 형식으로 포함하지 않습니다. - 이 옵션을 True로 설정하면 호출
from_protobuf()
후 Spark 구조체는 다음과{"name": "", "age": 0, "middle_name": "", "salary": null}
같습니다.salary
필드는 명시적으로 선언되고optional
입력 레코드에 설정되지 않으므로 null로 유지됩니다.
- 이 옵션을 False로 설정하면 호출
- enums.as.ints: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
값
- False(기본값)
- True: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
예제: 다음 Protobuf를 고려합니다.
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
다음과 같은
Person(job = ENGINEER)
Protobuf 메시지가 표시됩니다.- 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다
{"job": "ENGINEER"}
. - 이 옵션을 사용하도록 설정하면 해당 Spark 구조체가 됩니다
{"job": 1}
.
이러한 필드의 스키마는 각 사례에서 다릅니다(기본 문자열이 아닌 정수). 이러한 변경은 다운스트림 테이블의 스키마에 영향을 줄 수 있습니다.
- 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다
스키마 레지스트리 옵션
다음 스키마 레지스트리 옵션은 Protobuf 함수와 함께 스키마 레지스트리를 사용하는 동안 관련이 있습니다.
- schema.registry.subject
- Required
- 스키마 레지스트리의 스키마 제목(예: "client-event")을 지정합니다.
- schema.registry.address
- Required
- 스키마 레지스트리의 URL(예: )
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- 선택 사항
- 기본값:
<NONE>
. - 주체에 대한 스키마 레지스트리 항목에는 단일
proto
파일처럼 여러 Protobuf 정의가 포함될 수 있습니다. 이 옵션을 지정하지 않으면 스키마에 첫 번째 Protobuf가 사용됩니다. 항목의 첫 번째 메시지가 아닌 경우 Protobuf 메시지의 이름을 지정합니다. 예를 들어 두 개의 Protobuf 정의인 "Person" 및 "Location"이 있는 항목을 순서대로 고려해 보세요. 스트림이 "Person"이 아닌 "Location"에 해당하는 경우 이 옵션을 "위치"(또는 패키지 "com.example.protos.Location"를 포함한 전체 이름)로 설정합니다.
- schema.registry.schema.evolution.mode
- 기본값: "restart".
- 지원되는 모드:
- "restart"
- "none"
- 이 옵션은 스키마 진화 모드를
from_protobuf()
설정합니다. 쿼리 시작 시 Spark는 지정된 주제에 대한 최신 스키마 ID를 기록합니다. 그러면 에 대한from_protobuf()
스키마가 결정됩니다. 쿼리가 시작된 후 새 스키마가 스키마 레지스트리에 게시될 수 있습니다. 들어오는 레코드에서 최신 스키마 ID가 발견되면 스키마가 변경되었음을 나타냅니다. 이 옵션은 스키마에 대한 이러한 변경을 처리하는 방법을 결정합니다.- restart (기본값): 최신 스키마 ID가 발견되면 트리거
UnknownFieldException
됩니다. 그러면 쿼리가 종료됩니다. Databricks는 쿼리 실패 시 다시 시작하도록 작업을 구성하여 스키마 변경 내용을 선택하는 것이 좋습니다. - none: 스키마 ID 변경 내용은 무시됩니다. 최신 스키마 ID가 있는 레코드는 쿼리 시작 시 관찰된 것과 동일한 스키마로 구문 분석됩니다. 최신 Protobuf 정의는 이전 버전과 호환될 것으로 예상되며 새 필드는 무시됩니다.
- restart (기본값): 최신 스키마 ID가 발견되면 트리거
- confluent.schema.registry.
<schema-registy-client-option>
- 선택 사항
- 스키마 레지스트리는 Confluent 스키마 레지스트리 클라이언트를 사용하여 Confluent 스키마 레지스트리에 연결합니다. 클라이언트에서 지원하는 모든 구성 옵션은 "confluent.schema.registry" 접두사를 사용하여 지정할 수 있습니다. 예를 들어 다음 두 설정은 "USER_INFO" 인증 자격 증명을 제공합니다.
- "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"