Condividi tramite


Inserire dati come tipo variant semistrutturato

Importante

Questa funzionalità è disponibile in anteprima pubblica.

In Databricks Runtime 15.3 e versioni successive è possibile usare il VARIANT tipo per inserire dati semistrutturati. Questo articolo descrive il comportamento e fornisce modelli di esempio per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico e COPY INTO, lo streaming di record da Kafka e i comandi SQL per la creazione di nuove tabelle con dati variant o l'inserimento di nuovi record usando il tipo variant. La tabella seguente riepiloga i formati di file supportati e il supporto della versione di Databricks Runtime:

Formato del file Versione supportata di Databricks Runtime
JSON (JavaScript Object Notation) 15.3 e versioni successive
XML 16.4 e versioni successive
CSV 16.4 e versioni successive

Consulta Dati delle varianti delle query.

Creare una tabella con una colonna variante

VARIANT è un tipo SQL standard in Databricks Runtime 15.3 e versioni successive ed è supportato dalle tabelle supportate da Delta Lake. Le tabelle gestite in Azure Databricks usano Delta Lake per impostazione predefinita, quindi è possibile creare una tabella vuota con una singola VARIANT colonna usando la sintassi seguente.

CREATE TABLE table_name (variant_column VARIANT)

In alternativa, è possibile usare un'istruzione CTAS per creare una tabella con una colonna variante. Usare la PARSE_JSON funzione per analizzare le stringhe JSON o la FROM_XML funzione per analizzare le stringhe XML. Nell'esempio seguente viene creata una tabella con due colonne.

  • La id colonna viene estratta dalla stringa JSON come STRING tipo.
  • variant_column contiene l'intera stringa JSON codificata come VARIANT tipo.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Nota

Databricks consiglia di estrarre campi sottoposti a query frequenti e di archiviarli come colonne non varianti per accelerare le query e ottimizzare il layout di archiviazione.

VARIANT colonne non possono essere usate per chiavi di clustering, partizioni o chiavi con Z-order. Non è possibile utilizzare il tipo di dati VARIANT per le operazioni di confronto, raggruppamento, ordinamento e set. Per altre informazioni, vedere Limitazioni.

Inserire dati usando parse_json

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare parse_json per inserire record di stringa JSON come VARIANT. Ad esempio, analizzare le stringhe JSON dalla json_string colonna e inserirle in table_name.

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Pitone

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati usando from_xml

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare from_xml per inserire record di stringa XML come VARIANT. Ad esempio, analizzare le stringhe XML dalla xml_string colonna e inserirle in table_name.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Pitone

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati usando from_csv

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare from_csv per inserire record di stringa CSV come VARIANT. Ad esempio, analizzare i record CSV dalla csv_string colonna e inserirli in table_name.

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Pitone

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati dall'archiviazione di oggetti cloud come variante

Il caricatore automatico può essere usato per caricare tutti i dati dalle origini file supportate come singola VARIANT colonna in una tabella di destinazione. Poiché VARIANT è flessibile per la modifica dello schema e del tipo e mantiene la distinzione tra maiuscole e minuscole e i valori NULL presenti nell'origine dati, questo modello è affidabile per la maggior parte degli scenari di inserimento con le avvertenze seguenti:

  • I record malformati non possono essere codificati usando tipo VARIANT.
  • VARIANT il tipo può contenere solo record di dimensioni fino a 16 MB.

Nota

Variant considera i record eccessivamente grandi come simili ai record danneggiati. Nella modalità di elaborazione predefinita PERMISSIVE , i record eccessivamente grandi vengono acquisiti in corruptRecordColumn.

Poiché l'intero record viene registrato come una singola VARIANT colonna, non viene eseguita alcuna evoluzione dello schema durante l'inserimento e rescuedDataColumn non è supportata. Nell'esempio seguente si presuppone che la tabella di destinazione esista già con una singola colonna VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

È anche possibile specificare VARIANT quando si definisce uno schema o si passa schemaHints. I dati nel campo di origine a cui si fa riferimento devono contenere un record valido. Negli esempi seguenti viene illustrata questa sintassi.

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Usare COPY INTO con variante

Databricks consiglia di utilizzare Auto Loader rispetto a COPY INTO quando sia disponibile.

COPY INTO supporta l'inserimento dell'intero contenuto di un'origine di dati supportata come singola colonna. L'esempio seguente crea una nuova tabella con una singola colonna VARIANT e quindi usa COPY INTO per inserire record da un'origine file JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FILES = ('file-name')
  FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')

Trasmettere dati Kafka come variabile

Molti flussi Kafka codificano i payload usando JSON. L'inserimento di flussi Kafka tramite VARIANT rende questi carichi di lavoro affidabili per le modifiche dello schema.

L'esempio seguente illustra la lettura di un'origine di streaming Kafka, la conversione del key in STRING e del value in VARIANT, e la scrittura in una tabella di destinazione.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Passaggi successivi