Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Bu özellik Genel Önizlemededir.
Databricks Runtime 15.3 ve üzerindeki sürümlerde, VARIANT türünü kullanarak yarı yapılandırılmış verileri alabilirsiniz. Bu makalede davranış açıklanır ve Otomatik Yükleyici ve COPY INTOkullanarak bulut nesne depolama alanından veri almak, Kafka'dan kayıt akışı yapmak ve değişken verilerle yeni tablolar oluşturmak veya değişken türünü kullanarak yeni kayıtlar eklemek için SQL komutları için örnek desenler sağlanır. Aşağıdaki tabloda desteklenen dosya biçimleri ve Databricks Runtime sürümü desteği özetlenmektedir:
| Dosya biçimi | Desteklenen Databricks Runtime sürümü |
|---|---|
| JSON veri formatı | 15.3 ve üzeri |
| XML | 16.4 ve üzeri |
| CSV | 16.4 ve üzeri |
Bkz. Değişken verileri sorgulama.
Değişken sütunlu tablo oluşturma
VARIANT , Databricks Runtime 15.3 ve üzeri sürümlerin standart bir SQL türüdür ve Delta Lake tarafından desteklenen tablolar tarafından desteklenir. Azure Databricks'te yönetilen tablolar varsayılan olarak Delta Lake kullanır, böylece aşağıdaki söz dizimini kullanarak tek VARIANT sütunlu boş bir tablo oluşturabilirsiniz.
CREATE TABLE table_name (variant_column VARIANT)
Alternatif olarak, değişken sütunlu bir tablo oluşturmak için CTAS deyimi kullanabilirsiniz.
PARSE_JSON JSON dizelerini ayrıştırmak için işlevini veya FROM_XML XML dizelerini ayrıştırmak için işlevini kullanın. Aşağıdaki örnek, iki sütunlu bir tablo oluşturur.
-
idsütunu, JSON dizesindenSTRINGtürü olarak ayıklanır. -
variant_columntürü olarakVARIANTkodlanmış JSON dizesinin tamamını içerir.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Not Düşün
Databricks, sorguları hızlandırmak ve depolama düzenini iyileştirmek için sık sorgulanan alanların ayıklanıp değişken olmayan sütunlar olarak depolanmasını önerir.
VARIANT sütunlar kümeleme anahtarları, bölümler veya Z sırası anahtarları için kullanılamaz.
VARIANT veri türü karşılaştırmalar, gruplandırma, sıralama ve ayarlama işlemleri için kullanılamaz. Daha fazla bilgi için bkz . Sınırlamalar.
Veriyi parse_json kullanarak ekleyin
Eğer hedef tabloda VARIANT olarak kodlanmış bir sütun zaten varsa, JSON dize kayıtlarını parse_json kullanarak VARIANT olarak ekleyebilirsiniz. Örneğin, json_string sütunundaki JSON dizelerini ayrıştırın ve table_name içine ekleyin.
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Piton
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
kullanarak veri ekleme from_xml
Hedef tablo zaten VARIANT olarak kodlanmış bir sütun içeriyorsa, from_xml kullanarak XML dizesi kayıtlarını VARIANT olarak ekleyebilirsiniz. Örneğin, XML dizelerini xml_string sütunundan ayrıştırın ve onları table_name sütununa ekleyin.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Piton
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
kullanarak veri ekleme from_csv
Hedef tablo zaten VARIANT olarak kodlanmış bir sütun içeriyorsa, CSV dizesi kayıtlarını from_csv kullanarak VARIANT olarak ekleyebilirsiniz. Örneğin, CSV kayıtlarını csv_string sütunundan ayrıştırın ve table_name'e ekleyin.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Piton
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Bulut nesne depolama alanından değişken olarak veri alma
Otomatik Yükleyici, desteklenen dosya kaynaklarından gelen tüm verileri hedef tabloda tek VARIANT bir sütun olarak yüklemek için kullanılabilir.
VARIANT şemaya ve tür değişikliklerine esnek olduğundan ve veri kaynağında bulunan büyük/küçük harf duyarlılığını ve NULL değerlerini koruduğundan, bu düzen aşağıdaki istisnalarla çoğu yükleme senaryosu için sağlamdır:
- Hatalı biçimlendirilmiş kayıtlar
VARIANTtipi kullanılarak kodlanamaz. -
VARIANTtürü yalnızca 16 MB'a kadar olan kayıtları barındırabilir.
Not Düşün
Variant, aşırı büyük kayıtları bozuk kayıtlara benzer şekilde ele alır. Varsayılan PERMISSIVE işleme modunda, fazla büyük kayıtlar corruptRecordColumn içinde yakalanır.
Kaydın tamamı tek VARIANT bir sütun olarak kaydedildiğinden, alma sırasında şema evrimi gerçekleşmez ve rescuedDataColumn desteklenmez. Aşağıdaki örnekte, hedef tablonun tek bir VARIANT sütunuyla zaten var olduğu varsayılır.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Ayrıca bir şema tanımlarken veya VARIANTgeçirirken schemaHints belirtebilirsiniz. Başvuruda bulunılan kaynak alanındaki veriler geçerli bir kayıt içermelidir. Aşağıdaki örneklerde bu söz dizimi gösterilmektedir.
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Varyant ile kullan COPY INTO
Databricks, mevcut olduğu durumda COPY INTO yerine Otomatik Yükleyici'nin kullanılmasını önerir.
COPY INTO desteklenen bir veri kaynağının tüm içeriğinin tek bir sütun olarak alımını destekler. Aşağıdaki örnek, tek bir VARIANT sütunuyla yeni bir tablo oluşturur ve ardından COPY INTO kullanarak bir JSON dosya kaynağından kayıt alır.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')
Kafka verilerini değişken olarak akışla aktarma
Birçok Kafka akışı JSON kullanarak yüklerini kodlar.
VARIANT kullanarak Kafka akışlarının alımı, bu iş yüklerinin şema değişikliklerine karşı sağlam olmasını sağlar.
Aşağıdaki örnekte Kafka akış kaynağını okuma, keySTRING ve valueVARIANTolarak atama ve hedef tabloya yazma işlemleri gösterilmektedir.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Sonraki Adımlar
- Değişken verileri sorgulama.
- Değişken türü desteğini yapılandırın.
- Otomatik Yükleyici hakkında daha fazla bilgi edinin. Bkz. Otomatik Yükleyici nedir?.