from_avro

Wandelt eine binäre Spalte des Avro-Formats in den entsprechenden Katalysatorwert um. Das angegebene Schema muss mit den Lesedaten übereinstimmen, andernfalls ist das Verhalten nicht definiert: Es schlägt fehl oder gibt ein beliebiges Ergebnis zurück.

Wenn jsonFormatSchema nicht angegeben, aber beide subject bereitgestellt werden, schemaRegistryAddress konvertiert die Funktion eine binäre Spalte des Schema Registry Avro-Formats in ihren entsprechenden Katalysatorwert.

Syntax

from pyspark.sql.avro.functions import from_avro

from_avro(data, jsonFormatSchema=None, options=None, subject=None, schemaRegistryAddress=None)

Parameter

Parameter Typ Beschreibung
data pyspark.sql.Column oder str Die binäre Spalte mit avrocodierten Daten.
jsonFormatSchema str, optional Das Avro-Schema im JSON-Zeichenfolgenformat.
options Diktat, optional Optionen zum Steuern der Analyse und Konfiguration des Avro-Eintrags für den Schemaregistrierungsclient.
subject str, optional Der Betreff in der Schemaregistrierung, zu dem die Daten gehören.
schemaRegistryAddress str, optional Die Adresse (Host und Port) der Schemaregistrierung.

Optionen

Auswahl Werte Beschreibung
mode FAILFAST, PERMISSIVE Fehlerbehandlungsmodus. Standardwert: FAILFAST. Im PERMISSIVE Modus werden beschädigte Datensätze anstelle eines Fehlers festgelegt NULL .
compression uncompressed, , snappydeflate, bzip2, , xzzstandard Komprimierungscodec zum Codieren von Avro-Daten.
avroSchemaEvolutionMode none, restart Schemaentwicklungsmodus. Standardwert: none. Bei Festlegung auf restartwird eine UnknownFieldException Abfrage ausgelöst, wenn sich das Schema ändert. Starten Sie den Auftrag neu, um das neue Schema zu verwenden. Siehe Verwenden des Schemaentwicklungsmodus mit from_avro.
recursiveFieldMaxDepth Bereich: -1 bis 15 Maximale Rekursionstiefe entlang eines einzelnen rekursiven Pfads. Standard: -1, wodurch die Rekursionstiefe nicht eingeschränkt wird.
Wenn ein freigegebener Typ aus vielen unterschiedlichen Schemapfaden erreichbar ist, kann die Schemaerweiterung dazu führen, dass der Treiber nicht genügend Arbeitsspeicher hat, da diese Option nur eine Tiefe auf einem Pfad begrenzt. So umgehen Sie Folgendes:

Rückkehr

pyspark.sql.Column: Eine neue Spalte, die die deserialisierten Avro-Daten als entsprechenden Katalysatorwert enthält.

Beispiele

Beispiel 1: Deserialisieren einer Avro-Binärspalte mithilfe eines JSON-Schemas

from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro

data = [(1, Row(age=2, name='Alice'))]
df = spark.createDataFrame(data, ("key", "value"))
avro_df = df.select(to_avro(df.value).alias("avro"))
json_format_schema = '''{"type":"record","name":"topLevelRecord","fields":
    [{"name":"avro","type":[{"type":"record","name":"value",
    "namespace":"topLevelRecord","fields":[{"name":"age","type":["long","null"]},
    {"name":"name","type":["string","null"]}]},"null"]}]}'''
avro_df.select(from_avro(avro_df.avro, json_format_schema).alias("value")).show(truncate=False)
+------------------+
|value             |
+------------------+
|{{2, Alice}}      |
+------------------+