다음을 통해 공유


스트리밍 Avro 데이터 읽기 및 쓰기

Apache Avro는 스트리밍에서 일반적으로 사용되는 데이터 serialization 시스템입니다. 일반적인 솔루션은 데이터를 Apache Kafka의 Avro 형식, Confluent 스키마 레지스트리의 메타데이터로 변환한 Kafka 및 스키마 레지스트리에 연결되는 스트리밍 프레임워크를 사용하여 쿼리를 실행하는 것입니다.

Azure Databricks는 Kafka의 from_avro Avro 데이터 및 스키마 레지스트리의 메타데이터를 사용하여 스트리밍 파이프라인을 빌드하는 기능 및 to_avro 기능을 지원합니다. to_avro 함수는 열을 Avro 형식의 이진 데이터로 인코딩하고 from_avro는 Avro 이진 데이터를 열로 디코딩합니다. 두 함수 모두 한 열을 다른 열로 변환하고 입력/출력 SQL 데이터 형식은 복합 형식 또는 기본 형식일 수 있습니다.

참고 항목

from_avroto_avro 함수에는 다음과 같은 특성이 있습니다.

  • Python, Scala 및 Java에서 사용할 수 있습니다.
  • 일괄 처리 및 스트리밍 쿼리에서 SQL 함수로 전달할 수 있습니다.

Avro 파일 데이터 원본도 참조하세요.

수동으로 지정한 스키마 예제

from_json 및 to_json 마찬가지로 모든 이진 열을 사용할 from_avro to_avro 수 있습니다. 다음 예제와 같이 Avro 스키마를 수동으로 지정할 수 있습니다.

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema 예제

스키마를 JSON 문자열로 지정할 수도 있습니다. 예를 들어 /tmp/user.avsc가 다음과 같고

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

다음과 같이 JSON 문자열을 만들 수 있습니다.

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

그런 다음 from_avro에서 스키마를 사용합니다.

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

스키마 레지스트리를 사용하는 예제

클러스터에 스키마 레지스트리 서비스가 있는 경우 from_avro와 함께 사용하면 Avro 스키마를 수동으로 지정할 필요가 없습니다.

다음 예제에서는 키와 값이 이미 스키마 레지스트리에 형식 STRING 의 "t-key" 및 "t-value" 주체로 등록되어 있다고 가정하여 Kafka 토픽 "t"를 읽는 방법을 보여 줍니다 INT.

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

to_avro의 경우 기본 출력 Avro 스키마는 다음과 같은 이유로 스키마 레지스트리 서비스에서 대상 주체인 스키마와 일치하지 않을 수 있습니다.

  • Spark SQL 형식에서 Avro 스키마로 일대일 매핑되지 않습니다. Spark SQL에 지원되는 유형 -> Avro 변환을 참조하세요.
  • 변환된 출력 Avro 스키마가 레코드 형식인 경우 레코드 이름은 topLevelRecord이고 기본적으로 네임스페이스가 없습니다.

to_avro의 기본 출력 스키마가 대상 주체의 스키마와 일치하는 경우 다음을 수행할 수 있습니다.

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

그렇지 않으면 to_avro 함수에서 대상 주체의 스키마를 제공해야 합니다.

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

외부 Confluent 스키마 레지스트리에 인증

Databricks Runtime 12.2 LTS 이상에서는 외부 Confluent 스키마 레지스트리에 인증할 수 있습니다. 다음 예제에서는 인증 자격 증명 및 API 키를 포함하도록 스키마 레지스트리 옵션을 구성하는 방법을 보여 줍니다.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity 카탈로그 볼륨에서 truststore 및 키 저장소 파일 사용

Databricks Runtime 14.3 LTS 이상에서는 Unity 카탈로그 볼륨의 truststore 및 키 저장소 파일을 사용하여 Confluent 스키마 레지스트리에 인증할 수 있습니다. 다음 구문을 사용하여 이전 예제의 구성을 업데이트합니다.

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

다음으로 스키마 진화 모드 사용 from_avro

Databricks Runtime 14.2 이상에서는 스키마 진화 모드를 from_avro사용할 수 있습니다. 스키마 진화 모드를 사용하도록 설정하면 스키마 진화를 감지한 UnknownFieldException 후 작업이 throw됩니다. Databricks는 스키마 진화 모드로 작업을 구성하여 작업 실패 시 자동으로 다시 시작하는 것이 좋습니다. 실패 시 스트리밍 쿼리를 다시 시작하도록 구조적 스트리밍 작업 구성을 참조하세요.

스키마 진화는 원본 데이터의 스키마가 시간이 지남에 따라 진화하고 데이터 원본에서 모든 필드를 수집할 것으로 예상하는 경우에 유용합니다. 쿼리가 이미 데이터 원본에서 쿼리할 필드를 명시적으로 지정하는 경우 스키마 진화와 관계없이 추가된 필드는 무시됩니다.

스키마 진화를 사용하도록 설정하려면 이 avroSchemaEvolutionMode 옵션을 사용합니다. 다음 표에서는 스키마 진화 모드에 대한 옵션을 설명합니다.

옵션 동작
none Default입니다. 스키마 진화를 무시하고 작업이 계속됩니다.
restart 스키마 진화를 UnknownFieldException 감지할 때 throw합니다. 작업을 다시 시작해야 합니다.

참고 항목

스트리밍 작업 간에 이 구성을 변경하고 동일한 검사점을 다시 사용할 수 있습니다. 스키마를 사용하지 않도록 설정하면 열이 삭제될 수 있습니다.

구문 분석 모드 구성

구문 분석 모드를 구성하여 스키마 진화 모드가 비활성화되고 스키마가 이전 버전과 호환되지 않는 방식으로 진화할 때 null 레코드를 실패할지 또는 내보낼지 여부를 결정할 수 있습니다. 기본 설정을 from_avro 사용하면 호환되지 않는 스키마 변경 내용이 관찰되면 실패합니다.

mode 문 분석 모드를 지정하려면 이 옵션을 사용합니다. 다음 표에서는 구문 분석 모드에 대한 옵션을 설명합니다.

옵션 동작
FAILFAST Default입니다. 구문 분석 오류는 다음을 throw합니다 SparkException errorClass MALFORMED_AVRO_MESSAGE.
PERMISSIVE 구문 분석 오류가 무시되고 null 레코드가 내보내집니다.

참고 항목

스키마 진화를 사용하도록 설정하면 FAILFAST 레코드가 손상된 경우에만 예외가 throw됩니다.

스키마 진화 및 구문 분석 모드 설정 사용 예제

다음 예제에서는 스키마 진화를 사용하도록 설정하고 Confluent 스키마 레지스트리를 사용하여 구문 분석 모드를 지정하는 FAILFAST 방법을 보여 줍니다.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)