Megosztás a következőn keresztül:


Avro-fájl

Az Apache Avro egy adat szerializálási rendszer. Az Avro a következőt biztosítja:

  • Gazdag adatstruktúrák.
  • Kompakt, gyors bináris adatformátum.
  • Tárolófájl az állandó adatok tárolásához.
  • Távoli eljáráshívás (RPC).
  • Egyszerű integráció dinamikus nyelvekkel. A kódlétrehozás nem szükséges adatfájlok olvasásához vagy írásához, valamint RPC-protokollok használatához vagy implementálásához. A kódlétrehozás választható optimalizálásként, csak statikusan gépelt nyelvek esetén érdemes implementálni.

Az Avro-adatforrás a következőket támogatja:

  • Sémakonvertálás: Az Apache Spark SQL és az Avro rekordok közötti automatikus átalakítás.
  • Particionálás: A particionált adatok egyszerű olvasása és írása további konfiguráció nélkül.
  • Tömörítés: Az Avro lemezre írásához használható tömörítés. A támogatott típusok a következőkuncompressed: és snappydeflate. Megadhatja a deflátumszintet is.
  • Rekordnevek: Rekordnév és névtér a paraméterek recordName recordNamespaceés a .

Lásd még a streamelési Avro-adatok olvasását és írását.

Konfiguráció

Az Avro-adatforrások viselkedését különböző konfigurációs paraméterekkel módosíthatja.

Ha a fájlokat a .avro bővítmény nélkül szeretné figyelmen kívül hagyni olvasáskor, beállíthatja a paramétert avro.mapred.ignore.inputs.without.extension a Hadoop-konfigurációban. Az alapértelmezett érték false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Ha írás közben szeretné konfigurálni a tömörítést, állítsa be a következő Spark-tulajdonságokat:

  • Tömörítési kodek: spark.sql.avro.compression.codec. A támogatott kodekek a következők snappy : és deflate. Az alapértelmezett kodek az snappy.
  • Ha a tömörítési kodek az deflate, a tömörítési szintet a következővel állíthatja be: spark.sql.avro.deflate.level. Az alapértelmezett szint a .-1

Ezeket a tulajdonságokat beállíthatja a fürt Spark-konfigurációjában vagy futtatókörnyezetben a használatával spark.conf.set(). Példa:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

A Databricks Runtime 9.1 LTS-hez és újabb verziókhoz módosíthatja az alapértelmezett sémakövető viselkedést az Avro-ban, ha lehetőséget ad a mergeSchema fájlok olvasására. Ha mergeSchema a célkönyvtárban lévő Avro-fájlok egy készletéből szeretne sémát kikövetkesíteni, és egyesíteni szeretné őket, ahelyett, hogy true az olvasási sémát egyetlen fájlból következteti.

Az Avro által támogatott típusok –> Spark SQL-átalakítás

Ez a kódtár minden Avro-típus olvasását támogatja. A következő leképezést használja az Avro-típusok és a Spark SQL-típusok között:

Avro típusa Spark SQL-típus
Logikai Logikai típus
egész Egész számtípus
hosszú LongType
float FloatType
double DoubleType
bájt BinaryType
húr StringType
rekord StructType
Enum StringType
array ArrayType
térkép MapType
fix BinaryType
unió Lásd: Uniótípusok.

Egyesítő típusok

Az Avro-adatforrás támogatja az olvasási union típusokat. Az Avro a következő három típust tekinti típusnak union :

  • union(int, long)a térképet a LongType
  • union(float, double)a térképet a DoubleType
  • union(something, null), ahol something minden támogatott Avro-típus. Ez ugyanazzal a Spark SQL-típussal nullable lesz leképezvetrue, mint a somethingkövetkezőhöz: .

Minden más union típus összetett típus. A mezőnevek és így tovább a mezőnevek member0member1helyére vannak leképezve StructType a uniontagoknak megfelelően. Ez összhangban van az Avro és a Parquet közötti konvertálás viselkedésével.

Logikai típusok

Az Avro-adatforrás a következő Avro logikai típusok olvasását támogatja:

Avro logikai típus Avro típusa Spark SQL-típus
dátum: egész DateType
időbélyeg-millis hosszú Időbélyegtípus
időbélyeg-mikrok hosszú Időbélyegtípus
tizedes fix Decimális típus
tizedes bájt Decimális típus

Feljegyzés

Az Avro-adatforrás figyelmen kívül hagyja az Avro-fájlban található dokumentumokat, aliasokat és egyéb tulajdonságokat.

A Spark SQL támogatott típusai –> Avro-átalakítás

Ez a kódtár támogatja az összes Spark SQL-típus írását az Avro-ba. A legtöbb típus esetében a Spark-típusok és az Avro-típusok közötti leképezés egyszerű (például IntegerType átalakítja inta rendszer); az alábbiakban felsoroljuk a néhány speciális esetet:

Spark SQL-típus Avro típusa Avro logikai típus
ByteType egész
ShortType egész
BinaryType bájt
Decimális típus fix tizedes
Időbélyegtípus hosszú időbélyeg-mikrok
DateType egész dátum:

A teljes kimeneti Avro-sémát is megadhatja a beállítással avroSchema, így a Spark SQL-típusok más Avro-típusokká alakíthatók. A rendszer alapértelmezés szerint nem alkalmazza a következő átalakításokat, és felhasználó által megadott Avro-sémát igényel:

Spark SQL-típus Avro típusa Avro logikai típus
ByteType fix
StringType Enum
Decimális típus bájt tizedes
Időbélyegtípus hosszú időbélyeg-millis

Példák

Ezek a példák az episodes.avro fájlt használják.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Ez a példa egy egyéni Avro-sémát mutat be:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Ez a példa az Avro tömörítési lehetőségeit mutatja be:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Ez a példa a particionált Avro-rekordokat mutatja be:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Ez a példa a rekord nevét és névterét mutatja be:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Az Avro-adatok SQL-ben való lekérdezéséhez regisztrálja az adatfájlt táblaként vagy ideiglenes nézetként:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Példa jegyzetfüzetre: Avro-fájlok olvasása és írása

Az alábbi jegyzetfüzet bemutatja az Avro-fájlok olvasását és írását.

Avro-fájlok jegyzetfüzetének olvasása és írása

Jegyzetfüzet beszerzése