Condividi tramite


Inserire dati come tipo variant semistrutturato

Importante

Questa funzionalità è disponibile in anteprima pubblica. È possibile confermare la registrazione in anteprima nella pagina Anteprime . Vedere Gestire le anteprime di Azure Databricks.

In Databricks Runtime 15.3 e versioni successive è possibile usare il VARIANT tipo per inserire dati semistrutturati. Questo articolo descrive il comportamento e fornisce modelli di esempio per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico e COPY INTO, lo streaming di record da Kafka e i comandi SQL per la creazione di nuove tabelle con dati variant o l'inserimento di nuovi record usando il tipo variant. La tabella seguente riepiloga i formati di file supportati e il supporto della versione di Databricks Runtime:

Formato del file Versione supportata di Databricks Runtime
JSON (JavaScript Object Notation) 15.3 e versioni successive
XML 16.4 e versioni successive
CSV 16.4 e versioni successive

Consulta Dati delle varianti delle query.

Creare una tabella con una colonna variante

VARIANT è un tipo SQL standard in Databricks Runtime 15.3 e versioni successive e supportato dalle tabelle supportate da Delta Lake. Le tabelle gestite in Azure Databricks usano Delta Lake per impostazione predefinita, quindi è possibile creare una tabella vuota con una singola colonna VARIANT usando la sintassi seguente:

CREATE TABLE table_name (variant_column VARIANT)

In alternativa, è possibile usare la PARSE_JSON funzione in una stringa JSON o la FROM_XML funzione in una stringa XML per usare un'istruzione CTAS per creare una tabella con una colonna variante. Nell'esempio seguente viene creata una tabella con due colonne:

  • La colonna id è stata estratta dalla stringa JSON come tipo STRING.
  • La colonna variant_column contiene l'intera stringa JSON codificata come tipo VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Nota

Databricks consiglia di estrarre e archiviare i campi come colonne non varianti che si prevede di usare per accelerare le query e ottimizzare il layout di archiviazione.

VARIANT colonne non possono essere usate per chiavi di clustering, partizioni o chiavi con Z-order. Non è possibile utilizzare il tipo di dati VARIANT per le operazioni di confronto, raggruppamento, ordinamento e set. Per un elenco completo delle limitazioni, vedere limitazioni .

Inserire dati usando parse_json

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare parse_json per inserire record stringa JSON come VARIANT, come nell'esempio seguente:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Pitone

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati usando from_xml

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare from_xml per inserire record di stringa XML come VARIANT. Per esempio:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Pitone

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati usando from_csv

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare from_csv per inserire record di stringa XML come VARIANT. Per esempio:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Pitone

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati dall'archiviazione di oggetti cloud come variante

Il caricatore automatico può essere usato per caricare tutti i dati dalle origini file supportate come singola VARIANT colonna in una tabella di destinazione. Poiché VARIANT è flessibile per la modifica dello schema e del tipo e mantiene la distinzione tra maiuscole e minuscole e i valori NULL presenti nell'origine dati, questo modello è affidabile per la maggior parte degli scenari di inserimento con le avvertenze seguenti:

  • I record malformati non possono essere codificati usando tipo VARIANT.
  • VARIANT il tipo può contenere solo record di dimensioni fino a 16 MB.

Nota

Variant considera i record eccessivamente grandi come simili ai record danneggiati. Nella modalità di elaborazione predefinita PERMISSIVE , i record eccessivamente grandi vengono acquisiti in corruptRecordColumn.

Poiché l'intero record viene registrato come una singola VARIANT colonna, non viene eseguita alcuna evoluzione dello schema durante l'inserimento e rescuedDataColumn non è supportata. Nell'esempio seguente si presuppone che la tabella di destinazione esista già con una singola colonna VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

È anche possibile specificare VARIANT quando si definisce uno schema o si passa schemaHints. I dati nel campo di origine a cui si fa riferimento devono contenere un record valido. Gli esempi seguenti illustrano questa sintassi:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Usare COPY INTO con variante

Databricks consiglia di utilizzare Auto Loader rispetto a COPY INTO quando sia disponibile.

COPY INTO supporta l'inserimento dell'intero contenuto di un'origine di dati supportata come singola colonna. L'esempio seguente crea una nuova tabella con una singola colonna VARIANT e quindi usa COPY INTO per inserire record da un'origine file JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Trasmettere dati Kafka come variabile

Molti flussi Kafka codificano i payload usando JSON. L'inserimento di flussi Kafka tramite VARIANT rende questi carichi di lavoro affidabili per le modifiche dello schema.

L'esempio seguente illustra la lettura di un'origine di streaming Kafka, la conversione del key in STRING e del value in VARIANT, e la scrittura in una tabella di destinazione.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)