Aracılığıyla paylaş


Verileri yarı yapılandırılmış değişken türü olarak içe aktar

Önemli

Bu özellik Genel Önizlemededir.

Databricks Runtime 15.3 ve üzerindeki sürümlerde, VARIANT türünü kullanarak yarı yapılandırılmış verileri alabilirsiniz. Bu makalede davranış açıklanır ve Otomatik Yükleyici ve COPY INTOkullanarak bulut nesne depolama alanından veri almak, Kafka'dan kayıt akışı yapmak ve değişken verilerle yeni tablolar oluşturmak veya değişken türünü kullanarak yeni kayıtlar eklemek için SQL komutları için örnek desenler sağlanır. Aşağıdaki tabloda desteklenen dosya biçimleri ve Databricks Runtime sürümü desteği özetlenmektedir:

Dosya biçimi Desteklenen Databricks Runtime sürümü
JSON veri formatı 15.3 ve üzeri
XML 16.4 ve üzeri
CSV 16.4 ve üzeri

Bkz. Değişken verileri sorgulama.

Değişken sütunlu tablo oluşturma

VARIANT , Databricks Runtime 15.3 ve üzeri sürümlerin standart bir SQL türüdür ve Delta Lake tarafından desteklenen tablolar tarafından desteklenir. Azure Databricks'te yönetilen tablolar varsayılan olarak Delta Lake kullanır, böylece aşağıdaki söz dizimini kullanarak tek VARIANT sütunlu boş bir tablo oluşturabilirsiniz.

CREATE TABLE table_name (variant_column VARIANT)

Alternatif olarak, değişken sütunlu bir tablo oluşturmak için CTAS deyimi kullanabilirsiniz. PARSE_JSON JSON dizelerini ayrıştırmak için işlevini veya FROM_XML XML dizelerini ayrıştırmak için işlevini kullanın. Aşağıdaki örnek, iki sütunlu bir tablo oluşturur.

  • id sütunu, JSON dizesinden STRING türü olarak ayıklanır.
  • variant_column türü olarak VARIANT kodlanmış JSON dizesinin tamamını içerir.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Not Düşün

Databricks, sorguları hızlandırmak ve depolama düzenini iyileştirmek için sık sorgulanan alanların ayıklanıp değişken olmayan sütunlar olarak depolanmasını önerir.

VARIANT sütunlar kümeleme anahtarları, bölümler veya Z sırası anahtarları için kullanılamaz. VARIANT veri türü karşılaştırmalar, gruplandırma, sıralama ve ayarlama işlemleri için kullanılamaz. Daha fazla bilgi için bkz . Sınırlamalar.

Veriyi parse_json kullanarak ekleyin

Eğer hedef tabloda VARIANT olarak kodlanmış bir sütun zaten varsa, JSON dize kayıtlarını parse_json kullanarak VARIANT olarak ekleyebilirsiniz. Örneğin, json_string sütunundaki JSON dizelerini ayrıştırın ve table_name içine ekleyin.

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Piton

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

kullanarak veri ekleme from_xml

Hedef tablo zaten VARIANT olarak kodlanmış bir sütun içeriyorsa, from_xml kullanarak XML dizesi kayıtlarını VARIANT olarak ekleyebilirsiniz. Örneğin, XML dizelerini xml_string sütunundan ayrıştırın ve onları table_name sütununa ekleyin.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Piton

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

kullanarak veri ekleme from_csv

Hedef tablo zaten VARIANT olarak kodlanmış bir sütun içeriyorsa, CSV dizesi kayıtlarını from_csv kullanarak VARIANT olarak ekleyebilirsiniz. Örneğin, CSV kayıtlarını csv_string sütunundan ayrıştırın ve table_name'e ekleyin.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Piton

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Bulut nesne depolama alanından değişken olarak veri alma

Otomatik Yükleyici, desteklenen dosya kaynaklarından gelen tüm verileri hedef tabloda tek VARIANT bir sütun olarak yüklemek için kullanılabilir. VARIANT şemaya ve tür değişikliklerine esnek olduğundan ve veri kaynağında bulunan büyük/küçük harf duyarlılığını ve NULL değerlerini koruduğundan, bu düzen aşağıdaki istisnalarla çoğu yükleme senaryosu için sağlamdır:

  • Hatalı biçimlendirilmiş kayıtlar VARIANT tipi kullanılarak kodlanamaz.
  • VARIANT türü yalnızca 16 MB'a kadar olan kayıtları barındırabilir.

Not Düşün

Variant, aşırı büyük kayıtları bozuk kayıtlara benzer şekilde ele alır. Varsayılan PERMISSIVE işleme modunda, fazla büyük kayıtlar corruptRecordColumn içinde yakalanır.

Kaydın tamamı tek VARIANT bir sütun olarak kaydedildiğinden, alma sırasında şema evrimi gerçekleşmez ve rescuedDataColumn desteklenmez. Aşağıdaki örnekte, hedef tablonun tek bir VARIANT sütunuyla zaten var olduğu varsayılır.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Ayrıca bir şema tanımlarken veya VARIANTgeçirirken schemaHints belirtebilirsiniz. Başvuruda bulunılan kaynak alanındaki veriler geçerli bir kayıt içermelidir. Aşağıdaki örneklerde bu söz dizimi gösterilmektedir.

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Varyant ile kullan COPY INTO

Databricks, mevcut olduğu durumda COPY INTO yerine Otomatik Yükleyici'nin kullanılmasını önerir.

COPY INTO desteklenen bir veri kaynağının tüm içeriğinin tek bir sütun olarak alımını destekler. Aşağıdaki örnek, tek bir VARIANT sütunuyla yeni bir tablo oluşturur ve ardından COPY INTO kullanarak bir JSON dosya kaynağından kayıt alır.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FILES = ('file-name')
  FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')

Kafka verilerini değişken olarak akışla aktarma

Birçok Kafka akışı JSON kullanarak yüklerini kodlar. VARIANT kullanarak Kafka akışlarının alımı, bu iş yüklerinin şema değişikliklerine karşı sağlam olmasını sağlar.

Aşağıdaki örnekte Kafka akış kaynağını okuma, keySTRING ve valueVARIANTolarak atama ve hedef tabloya yazma işlemleri gösterilmektedir.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Sonraki Adımlar