Freigeben über


Erfassen von Daten als halbstrukturierter Variantentyp

Wichtig

Dieses Feature befindet sich in der Public Preview.

In Databricks Runtime 15.3 und höher können Sie den VARIANT Typ verwenden, um halbstrukturierte Daten zu erfassen. In diesem Artikel werden das Verhalten beschrieben und Beispielmuster zum Aufnehmen von Daten aus dem Cloudobjektspeicher mithilfe von AutoLadeprogramm und COPY INTO, Streamingdatensätzen von Kafka und SQL-Befehlen zum Erstellen neuer Tabellen mit Variantendaten oder Einfügen neuer Datensätze mithilfe des Variantentyps bereitgestellt. In der folgenden Tabelle sind die unterstützten Dateiformate und die Unterstützung der Databricks-Runtime-Version zusammengefasst:

Dateiformat Unterstützte Databricks-Runtime-Version
JSON 15.3 und höher
XML 16.4 und höher
CSV-Datei 16.4 und höher

Siehe Abfragevariantendaten.

Erstellen einer Tabelle mit einer Variant-Spalte

VARIANT ist ein standardmäßiger SQL-Typ in Databricks Runtime 15.3 und höher und wird von Tabellen unterstützt, die von Delta Lake unterstützt werden. Verwaltete Tabellen in Azure Databricks verwenden Delta Lake standardmäßig, sodass Sie eine leere Tabelle mit einer einzelnen VARIANT Spalte mit der folgenden Syntax erstellen können:

CREATE TABLE table_name (variant_column VARIANT)

Alternativ können Sie die PARSE_JSON Funktion für eine JSON-Zeichenfolge oder die FROM_XML Funktion in einer XML-Zeichenfolge verwenden, um eine CTAS-Anweisung zum Erstellen einer Tabelle mit einer Variant-Spalte zu verwenden. Im folgenden Beispiel wird eine Tabelle mit zwei Spalten erstellt:

  • Die id Spalte wurde aus der JSON-Zeichenfolge extrahiert und als STRING Typ ausgewiesen.
  • Die variant_column Spalte enthält die gesamte JSON-Zeichenfolge, die als VARIANT Typ codiert ist.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Hinweis

Databricks empfiehlt das Extrahieren und Speichern von Feldern als Nicht-Variant-Spalten, die Sie verwenden möchten, um Abfragen zu beschleunigen und das Speicherlayout zu optimieren.

VARIANT Spalten können nicht für Clusterschlüssel, Partitionen oder Z-Reihenfolge-Schlüssel verwendet werden. Der VARIANT Datentyp kann nicht für Vergleiche, Gruppierung, Sortierung und Festlegen von Vorgängen verwendet werden. Eine vollständige Liste der Einschränkungen finden Sie unter "Einschränkungen".

Einfügen von Daten mithilfe von parse_json

Wenn die Zieltabelle bereits eine Spalte codiert als VARIANT enthält, können Sie parse_json benutzen, um JSON-Zeichenfolgeneinträge als VARIANT einzufügen, wie im folgenden Beispiel:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Einfügen von Daten mithilfe von from_xml

Wenn die Zieltabelle bereits eine Spalte enthält, die als VARIANT codiert ist, können Sie from_xml verwenden, um XML-Zeichenfolgeneinträge als VARIANT einzufügen. Beispiel:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Einfügen von Daten mithilfe von from_csv

Wenn die Zieltabelle bereits eine Spalte enthält, die als VARIANT codiert ist, können Sie from_csv verwenden, um XML-Zeichenfolgeneinträge als VARIANT einzufügen. Beispiel:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Aufnehmen von Daten aus Cloudobjektspeicher als Variante

Das automatische Laden kann verwendet werden, um alle Daten aus den unterstützten Dateiquellen als einzelne VARIANT Spalte in einer Zieltabelle zu laden. Da VARIANT flexibel für Schema- und Typänderungen ist und die Groß-/Kleinschreibung und NULL-Werte in der Datenquelle verwaltet, ist dieses Muster robust für die meisten Aufnahmeszenarios mit den folgenden Einschränkungen:

  • Falsch formatierte Datensätze können nicht mithilfe des VARIANT Typs codiert werden.
  • Der VARIANT-Typ kann Datensätze nur bis zu 16 MB groß halten.

Hinweis

Variant behandelt übermäßig große Datensätze, ähnlich wie beschädigte Datensätze. Im Standardmodus PERMISSIVE werden übermäßig große Datensätze im corruptRecordColumn erfasst.

Da der gesamte Datensatz als einzelne VARIANT Spalte aufgezeichnet wird, tritt während der Aufnahme keine Schemaentwicklung auf und rescuedDataColumn wird nicht unterstützt. Im folgenden Beispiel wird davon ausgegangen, dass die Zieltabelle bereits mit einer einzelnen VARIANT Spalte vorhanden ist.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Sie können auch VARIANT angeben, wenn Sie ein Schema definieren oder schemaHints übergeben. Die Daten im Referenzquellfeld müssen einen gültigen Datensatz enthalten. Die folgenden Beispiele veranschaulichen diese Syntax:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Verwendung von COPY INTO mit Variante

Databricks empfiehlt die Verwendung von Auto Loader anstelle von COPY INTO, falls vorhanden.

COPY INTO unterstützt das Aufnehmen des gesamten Inhalts einer unterstützten Datenquelle als einzelne Spalte. Im folgenden Beispiel wird eine neue Tabelle mit einer einzelnen VARIANT Spalte erstellt und dann zum Erfassen von Datensätzen aus einer JSON-Dateiquelle COPY INTO verwendet.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Stream Kafka-Daten als Variante

Viele Kafka-Streams codieren ihre Nutzlasten mithilfe von JSON. Das Aufnehmen von Kafka-Datenströmen mithilfe von VARIANT macht diese Workloads robust gegenüber Schemaänderungen.

Das folgende Beispiel veranschaulicht das Lesen einer Kafka-Streamingquelle, das Umwandeln des key-Typs als STRING und value als VARIANT und das Schreiben in eine Zieltabelle.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)