Avro-fájl
Az Apache Avro egy adat szerializálási rendszer. Az Avro a következőt biztosítja:
- Gazdag adatstruktúrák.
- Kompakt, gyors bináris adatformátum.
- Tárolófájl az állandó adatok tárolásához.
- Távoli eljáráshívás (RPC).
- Egyszerű integráció dinamikus nyelvekkel. A kódlétrehozás nem szükséges adatfájlok olvasásához vagy írásához, valamint RPC-protokollok használatához vagy implementálásához. A kódlétrehozás választható optimalizálásként, csak statikusan gépelt nyelvek esetén érdemes implementálni.
Az Avro-adatforrás a következőket támogatja:
- Sémakonvertálás: Az Apache Spark SQL és az Avro rekordok közötti automatikus átalakítás.
- Particionálás: A particionált adatok egyszerű olvasása és írása további konfiguráció nélkül.
- Tömörítés: Az Avro lemezre írásához használható tömörítés. A támogatott típusok a következők
uncompressed
: éssnappy
deflate
. Megadhatja a deflátumszintet is. - Rekordnevek: Rekordnév és névtér a paraméterek
recordName
recordNamespace
és a .
Lásd még a streamelési Avro-adatok olvasását és írását.
Konfiguráció
Az Avro-adatforrások viselkedését különböző konfigurációs paraméterekkel módosíthatja.
Ha a fájlokat a .avro
bővítmény nélkül szeretné figyelmen kívül hagyni olvasáskor, beállíthatja a paramétert avro.mapred.ignore.inputs.without.extension
a Hadoop-konfigurációban. Az alapértelmezett érték false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Ha írás közben szeretné konfigurálni a tömörítést, állítsa be a következő Spark-tulajdonságokat:
- Tömörítési kodek:
spark.sql.avro.compression.codec
. A támogatott kodekek a következőksnappy
: ésdeflate
. Az alapértelmezett kodek azsnappy
. - Ha a tömörítési kodek az
deflate
, a tömörítési szintet a következővel állíthatja be:spark.sql.avro.deflate.level
. Az alapértelmezett szint a .-1
Ezeket a tulajdonságokat beállíthatja a fürt Spark-konfigurációjában vagy futtatókörnyezetben a használatával spark.conf.set()
. Példa:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
A Databricks Runtime 9.1 LTS-hez és újabb verziókhoz módosíthatja az alapértelmezett sémakövető viselkedést az Avro-ban, ha lehetőséget ad a mergeSchema
fájlok olvasására. Ha mergeSchema
a célkönyvtárban lévő Avro-fájlok egy készletéből szeretne sémát kikövetkesíteni, és egyesíteni szeretné őket, ahelyett, hogy true
az olvasási sémát egyetlen fájlból következteti.
Az Avro által támogatott típusok –> Spark SQL-átalakítás
Ez a kódtár minden Avro-típus olvasását támogatja. A következő leképezést használja az Avro-típusok és a Spark SQL-típusok között:
Avro típusa | Spark SQL-típus |
---|---|
Logikai | Logikai típus |
egész | Egész számtípus |
hosszú | LongType |
float | FloatType |
double | DoubleType |
bájt | BinaryType |
húr | StringType |
rekord | StructType |
Enum | StringType |
array | ArrayType |
térkép | MapType |
fix | BinaryType |
unió | Lásd: Uniótípusok. |
Egyesítő típusok
Az Avro-adatforrás támogatja az olvasási union
típusokat. Az Avro a következő három típust tekinti típusnak union
:
union(int, long)
a térképet aLongType
union(float, double)
a térképet aDoubleType
union(something, null)
, aholsomething
minden támogatott Avro-típus. Ez ugyanazzal a Spark SQL-típussalnullable
lesz leképezvetrue
, mint asomething
következőhöz: .
Minden más union
típus összetett típus. A mezőnevek és így tovább a mezőnevek member0
member1
helyére vannak leképezve StructType
a union
tagoknak megfelelően. Ez összhangban van az Avro és a Parquet közötti konvertálás viselkedésével.
Logikai típusok
Az Avro-adatforrás a következő Avro logikai típusok olvasását támogatja:
Avro logikai típus | Avro típusa | Spark SQL-típus |
---|---|---|
dátum: | egész | DateType |
időbélyeg-millis | hosszú | Időbélyegtípus |
időbélyeg-mikrok | hosszú | Időbélyegtípus |
tizedes | fix | Decimális típus |
tizedes | bájt | Decimális típus |
Feljegyzés
Az Avro-adatforrás figyelmen kívül hagyja az Avro-fájlban található dokumentumokat, aliasokat és egyéb tulajdonságokat.
A Spark SQL támogatott típusai –> Avro-átalakítás
Ez a kódtár támogatja az összes Spark SQL-típus írását az Avro-ba. A legtöbb típus esetében a Spark-típusok és az Avro-típusok közötti leképezés egyszerű (például IntegerType
átalakítja int
a rendszer); az alábbiakban felsoroljuk a néhány speciális esetet:
Spark SQL-típus | Avro típusa | Avro logikai típus |
---|---|---|
ByteType | egész | |
ShortType | egész | |
BinaryType | bájt | |
Decimális típus | fix | tizedes |
Időbélyegtípus | hosszú | időbélyeg-mikrok |
DateType | egész | dátum: |
A teljes kimeneti Avro-sémát is megadhatja a beállítással avroSchema
, így a Spark SQL-típusok más Avro-típusokká alakíthatók.
A rendszer alapértelmezés szerint nem alkalmazza a következő átalakításokat, és felhasználó által megadott Avro-sémát igényel:
Spark SQL-típus | Avro típusa | Avro logikai típus |
---|---|---|
ByteType | fix | |
StringType | Enum | |
Decimális típus | bájt | tizedes |
Időbélyegtípus | hosszú | időbélyeg-millis |
Példák
Ezek a példák az episodes.avro fájlt használják.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Ez a példa egy egyéni Avro-sémát mutat be:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Ez a példa az Avro tömörítési lehetőségeit mutatja be:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Ez a példa a particionált Avro-rekordokat mutatja be:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Ez a példa a rekord nevét és névterét mutatja be:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Az Avro-adatok SQL-ben való lekérdezéséhez regisztrálja az adatfájlt táblaként vagy ideiglenes nézetként:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Példa jegyzetfüzetre: Avro-fájlok olvasása és írása
Az alábbi jegyzetfüzet bemutatja az Avro-fájlok olvasását és írását.