Soubor Avro
Apache Avro je systém serializace dat. Avro poskytuje:
- Bohaté datové struktury.
- Kompaktní, rychlý binární datový formát.
- Soubor kontejneru pro ukládání trvalých dat.
- Vzdálené volání procedur (RPC).
- Jednoduchá integrace s dynamickými jazyky Generování kódu se nevyžaduje ke čtení nebo zápisu datových souborů ani k použití ani implementaci protokolů RPC. Generování kódu jako volitelná optimalizace stojí za to implementovat pouze pro staticky napsané jazyky.
- Převod schématu: Automatický převod mezi záznamy Apache Spark SQL a Avro
- Dělení: Snadné čtení a zápis dělených dat bez další konfigurace.
- Komprese: Komprese, která se má použít při zápisu Avro na disk. Podporované typy jsou
uncompressed
,snappy
adeflate
. Můžete také zadat úroveň deflate. - Názvy záznamů: Název záznamu a obor názvů předáním mapy parametrů s
recordName
arecordNamespace
.
Viz také čtení a zápis streamovaných dat Avro.
Konfigurace
Chování zdroje dat Avro můžete změnit pomocí různých parametrů konfigurace.
Pokud chcete při čtení ignorovat soubory bez přípony .avro
, můžete parametr nastavit avro.mapred.ignore.inputs.without.extension
v konfiguraci Hadoopu. Výchozí hodnota je false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Pokud chcete při zápisu nakonfigurovat kompresi, nastavte následující vlastnosti Sparku:
- Komprese kodeku:
spark.sql.avro.compression.codec
. Podporované kodeky jsousnappy
adeflate
. Výchozí kodek jesnappy
. - Pokud je
deflate
kodek komprese , můžete nastavit úroveň komprese pomocí:spark.sql.avro.deflate.level
. Výchozí úroveň je-1
.
Tyto vlastnosti můžete nastavit v konfiguraci Sparku clusteru nebo za běhu pomocí spark.conf.set()
. Příklad:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Pro Databricks Runtime 9.1 LTS a vyšší můžete změnit výchozí chování odvozování schématu v Avro tím, že při čtení souborů poskytnete mergeSchema
možnost. Nastavení mergeSchema
pro true
odvození schématu ze sady souborů Avro v cílovém adresáři a jejich sloučení místo odvození schématu čtení z jednoho souboru.
Podporované typy pro převod Avro –> Převod Spark SQL
Tato knihovna podporuje čtení všech typů Avro. Používá následující mapování z typů Avro na typy Spark SQL:
Typ Avro | Typ Spark SQL |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float (číslo s plovoucí řádovou čárkou) | FloatType |
double | DoubleType |
bajtů | BinaryType |
string | StringType |
záznam | Typ struktury |
enum | StringType |
pole | ArrayType |
map | MapType |
stabilní | BinaryType |
sjednocení | Viz typy Sjednocení. |
Typy sjednocení
Zdroj dat Avro podporuje typy čtení union
. Avro považuje následující tři typy za union
typy:
union(int, long)
mapuje naLongType
.union(float, double)
mapuje naDoubleType
.union(something, null)
, kdesomething
je jakýkoli podporovaný typ Avro. Tato možnost se mapuje na stejný typ Spark SQL jako typ sparkusomething
s nastaveným parametremnullable
true
.
Všechny ostatní union
typy jsou komplexní typy. Mapují se na StructType
místo, kde jsou member0
názvy polí , member1
a tak dále, v souladu se členy union
. To je konzistentní s chováním při převodu mezi Avro a Parquet.
Logické typy
Zdroj dat Avro podporuje čtení následujících logických typů Avro:
Logický typ Avro | Typ Avro | Typ Spark SQL |
---|---|---|
datum | int | Typ data |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | stabilní | DecimalType |
decimal | bajtů | DecimalType |
Poznámka:
Zdroj dat Avro ignoruje dokumenty, aliasy a další vlastnosti, které jsou přítomné v souboru Avro.
Podporované typy pro převod Spark SQL –> Avro
Tato knihovna podporuje zápis všech typů Spark SQL do Avro. U většiny typů je mapování typů Sparku na typy Avro jednoduché (například IntegerType
se převede na int
); následuje seznam několika speciálních případů:
Typ Spark SQL | Typ Avro | Logický typ Avro |
---|---|---|
ByteType | int | |
Krátký typ | int | |
BinaryType | bajtů | |
DecimalType | stabilní | decimal |
TimestampType | long | timestamp-micros |
Typ data | int | datum |
Můžete také zadat celé výstupní schéma Avro s možností avroSchema
, aby se typy Spark SQL mohly převést na jiné typy Avro.
Následující převody se ve výchozím nastavení nepoužívají a vyžadují schéma Avro zadané uživatelem:
Typ Spark SQL | Typ Avro | Logický typ Avro |
---|---|---|
ByteType | stabilní | |
StringType | enum | |
DecimalType | bajtů | decimal |
TimestampType | long | timestamp-millis |
Příklady
Tyto příklady používají soubor epizods.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Tento příklad ukazuje vlastní schéma Avro:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Tento příklad ukazuje možnosti komprese Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Tento příklad ukazuje dělené záznamy Avro:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Tento příklad ukazuje název záznamu a obor názvů:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Pokud chcete dotazovat data Avro v SQL, zaregistrujte datový soubor jako tabulku nebo dočasné zobrazení:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Příklad poznámkového bloku: Čtení a zápis souborů Avro
Následující poznámkový blok ukazuje, jak číst a zapisovat soubory Avro.