중요한
이 기능은 공개 미리 보기 상태입니다.
Databricks Runtime 15.3 이상에서는 이 형식을 VARIANT
사용하여 반구조화된 데이터를 수집할 수 있습니다. 이 문서에서는 동작을 설명하고 자동 로더 및 COPY INTO
사용하여 클라우드 개체 스토리지에서 데이터를 수집하기 위한 예제 패턴, Kafka에서 레코드 스트리밍, 변형 데이터로 새 테이블을 만들거나 변형 형식을 사용하여 새 레코드를 삽입하기 위한 SQL 명령을 제공합니다. 다음 표에는 지원되는 파일 형식 및 Databricks 런타임 버전 지원이 요약되어 있습니다.
파일 형식 | 지원되는 Databricks 런타임 버전 |
---|---|
JSON (자바스크립트 객체 표기법) | 15.3 이상 |
XML | 16.4 이상 |
CSV | 16.4 이상 |
쿼리 변형 데이터를 참조하세요.
변형 열이 포함된 테이블을 만들기
VARIANT
Databricks Runtime 15.3 이상의 표준 SQL 형식이며 Delta Lake에서 지원하는 테이블에서 지원됩니다. Azure Databricks의 관리되는 테이블은 기본적으로 Delta Lake를 사용하므로 다음 구문을 사용하여 단일 VARIANT
열이 있는 빈 테이블을 만들 수 있습니다.
CREATE TABLE table_name (variant_column VARIANT)
또는 JSON 문자열의 PARSE_JSON
함수 또는 XML 문자열의 FROM_XML
함수를 사용하여 CTAS 문을 사용하여 변형 열이 있는 테이블을 만들 수 있습니다. 다음 예제에서는 두 개의 열이 있는 테이블을 만듭니다.
- JSON 문자열에서
id
열을STRING
형식으로 추출한 것입니다. -
variant_column
열에는VARIANT
형식으로 인코딩된 전체 JSON 문자열이 포함됩니다.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
참고
Databricks는 쿼리를 가속화하고 스토리지 레이아웃을 최적화하는 데 사용할 비 변형 열로 필드를 추출하고 저장하는 것이 좋습니다.
VARIANT
열은 클러스터링 키, 파티션 또는 Z 순서 키에 사용할 수 없습니다.
VARIANT
데이터 형식은 비교, 그룹화, 순서 지정 및 설정 작업에 사용할 수 없습니다. 제한 사항의 전체 목록은 제한 사항참조하세요.
parse_json
사용하여 데이터 삽입
대상 테이블에 VARIANT
인코딩된 열이 이미 있는 경우 다음 예제와 같이 parse_json
사용하여 JSON 문자열 레코드를 VARIANT
삽입할 수 있습니다.
SQL (영문)
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
파이썬
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
from_xml
사용하여 데이터 삽입
대상 테이블에 이미 VARIANT
로 인코딩된 열이 있는 경우 from_xml
을 사용하여 XML 문자열 레코드를 VARIANT
로 삽입할 수 있습니다. 다음은 그 예입니다.
SQL (영문)
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
파이썬
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
를 사용하여 데이터 삽입 from_csv
대상 테이블에 이미 VARIANT
로 인코딩된 열이 있는 경우 from_xml
을 사용하여 XML 문자열 레코드를 VARIANT
로 삽입할 수 있습니다. 다음은 그 예입니다.
SQL (영문)
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
파이썬
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
클라우드 객체 스토리지에서 데이터를 변이 형식으로 수집
자동 로더를 사용하여 지원되는 파일 원본의 모든 데이터를 대상 테이블의 단일 VARIANT
열로 로드할 수 있습니다.
VARIANT
스키마 및 형식 변경에 유연하고 데이터 원본에 있는 대/소문자 민감도 및 NULL
값을 유지 관리하므로 이 패턴은 다음과 같은 주의 사항이 있는 대부분의 수집 시나리오에 강력합니다.
- 형식이 잘못된 레코드는
VARIANT
유형을 사용하여 인코딩할 수 없습니다. -
VARIANT
type은 최대 16mb 크기의 레코드만 보유할 수 있습니다.
참고
Variant는 손상된 레코드와 비슷하게 지나치게 큰 레코드를 처리합니다. 기본 PERMISSIVE
처리 모드에서는 지나치게 큰 레코드가 에 corruptRecordColumn
캡처됩니다.
전체 레코드는 단일 VARIANT
열로 기록되므로 수집 중에 스키마가 진화하지 않으며 rescuedDataColumn
지원되지 않습니다. 다음 예제에서는 대상 테이블이 이미 단일 VARIANT
열과 함께 존재한다고 가정합니다.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
스키마를 정의하거나 VARIANT
전달할 때 schemaHints
지정할 수도 있습니다. 참조된 원본 필드의 데이터에는 유효한 레코드가 포함되어야 합니다. 다음 예제에서는 이 구문을 보여 줍니다.
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
COPY INTO
를 변형과 함께 사용
Databricks는 COPY INTO
대신 사용할 수 있을 때 자동 로더를 사용할 것을 권장합니다.
COPY INTO
에서는 지원되는 데이터 원본의 전체 콘텐츠를 단일 열로 수집할 수 있습니다. 다음 예제에서는 단일 VARIANT
열이 있는 새 테이블을 만든 다음 COPY INTO
사용하여 JSON 파일 원본에서 레코드를 수집합니다.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Kafka 데이터를 변형으로 스트리밍
많은 Kafka 스트림은 JSON을 사용하여 페이로드를 인코딩합니다.
VARIANT
사용하여 Kafka 스트림을 수집하면 이러한 워크로드가 스키마 변경에 적응할 수 있게 됩니다.
다음 예제에서는 Kafka 스트리밍 원본을 읽고, key
를 STRING
로, value
를 VARIANT
로 캐스팅하여 대상 테이블에 쓰는 방법을 보여 줍니다.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)