Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Apache Avro to system serializacji danych. Firma Avro zapewnia:
- Rozbudowane struktury danych.
- Kompaktowy, szybki, binarny format danych.
- Plik kontenera do przechowywania trwałych danych.
- Zdalne wywołanie procedury (RPC).
- Prosta integracja z językami dynamicznymi. Generowanie kodu nie jest wymagane do odczytywania ani zapisywania plików danych ani używania ani implementowania protokołów RPC. Generowanie kodu jako opcjonalna optymalizacja, warto zaimplementować tylko w przypadku języków statycznie typiowanych.
- Konwersja schematu: automatyczna konwersja między rekordami Apache Spark SQL i Avro.
- Partycjonowanie: łatwe odczytywanie i zapisywanie partycjonowanych danych bez dodatkowej konfiguracji.
- Kompresja: kompresja używana podczas zapisywania avro na dysku. Obsługiwane typy to
uncompressed
,snappy
ideflate
. Można również określić poziom deflacji. - Nazwy rekordów: nazwa rekordu i przestrzeń nazw rekordów poprzez przekazanie mapy parametrów za pomocą
recordName
irecordNamespace
.
Zobacz również Odczytywanie i zapisywanie strumieniowych danych Avro.
Konfigurowanie
Zachowanie źródła danych Avro można zmienić przy użyciu różnych parametrów konfiguracji.
Aby zignorować pliki bez rozszerzenia .avro
podczas odczytywania, można ustawić parametr avro.mapred.ignore.inputs.without.extension
w konfiguracji usługi Hadoop. Wartość domyślna to false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Aby skonfigurować kompresję podczas pisania, ustaw następujące właściwości platformy Spark:
- Kodek kompresji:
spark.sql.avro.compression.codec
. Obsługiwane koderki tosnappy
ideflate
. Domyślny koder koderowy tosnappy
. - Jeśli koder kompresji jest
deflate
, można ustawić poziom kompresji na:spark.sql.avro.deflate.level
. Domyślnym poziomem jest-1
.
Te właściwości można ustawić w konfiguracji klastra Spark lub w czasie wykonywania przy użyciu spark.conf.set()
. Na przykład:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
W przypadku Databricks Runtime 9.1 LTS i nowszych można zmienić domyślne zachowanie wnioskowania schematu w Avro, podając opcję mergeSchema
przy odczycie plików. Ustawienie mergeSchema
na true
spowoduje wnioskowanie schematu z zestawu plików Avro w katalogu docelowym i scalanie ich, a nie wnioskowanie schematu odczytu z jednego pliku.
Obsługiwane typy dla Avro — konwersja Spark SQL
Ta biblioteka obsługuje odczyt wszystkich typów danych Avro. Używa następującego mapowania z typów Avro na typy Spark SQL:
Typ Avro | Typ Spark SQL |
---|---|
typ logiczny (boolowski) | Typ logiczny |
int (integer) | Typ Integer |
długi | LongType |
liczba zmiennoprzecinkowa | FloatType |
podwójny | PodwójnyTyp |
B | Typ Binarny |
ciąg znaków | TypCiągu |
rekord | Typ struktury |
wyliczenie | TypCiągu |
tablica | ArrayType |
mapa | Typ mapy |
naprawiony | Typ Binarny |
unia | Zobacz typy unii. |
Typy unii
Źródło danych Avro obsługuje odczyt typów union
. Avro uznaje następujące trzy typy za union
typy:
-
union(int, long)
odnosi się doLongType
. -
union(float, double)
odnosi się doDoubleType
. -
union(something, null)
, gdziesomething
jest dowolnym obsługiwanym typem Avro. To mapuje na ten sam typ Spark SQL, cosomething
, znullable
ustawionym natrue
.
Wszystkie inne union
typy są typami złożonymi. Mapuje się je na StructType
, gdzie nazwy pól to member0
, member1
itd., zgodnie z członkami union
. Jest to zgodne z zachowaniem podczas konwersji między Avro a Parquet.
Typy logiczne
Źródło danych Avro obsługuje odczytywanie następujących typów logicznych Avro:
Typ logiczny Avro | Typ Avro | Typ Spark SQL |
---|---|---|
data | int (integer) | Typ daty |
znacznik czasu-w milisekundach | długi | TypZnacznikaCzasu |
znacznik czasu-mikrosekundy | długi | TypZnacznikaCzasu |
dziesiętny | naprawiony | Typ dziesiętny |
dziesiętny | B | Typ dziesiętny |
Uwaga
Źródło danych Avro ignoruje dokumenty, aliasy i inne właściwości obecne w pliku Avro.
Obsługiwane typy dla obsługi Spark SQL —> konwersja Avro
Ta biblioteka obsługuje pisanie wszystkich typów spark SQL w usłudze Avro. W przypadku większości typów mapowanie z typów platformy Spark na typy Avro jest proste (na przykład IntegerType
jest konwertowane na int
); poniżej znajduje się lista kilku przypadków specjalnych:
Typ Spark SQL | Typ Avro | Typ logiczny Avro |
---|---|---|
Typ bajtu | int (integer) | |
ShortType | int (integer) | |
Typ Binarny | B | |
Typ dziesiętny | naprawiony | dziesiętny |
TypZnacznikaCzasu | długi | znacznik czasu-mikrosekundy |
Typ daty | int (integer) | data |
Można również określić cały schemat danych wyjściowych Avro z opcją avroSchema
, aby typy Spark SQL można przekonwertować na inne typy Avro.
Następujące konwersje nie są stosowane domyślnie i wymagają określonego przez użytkownika schematu Avro:
Typ Spark SQL | Typ Avro | Typ logiczny Avro |
---|---|---|
Typ bajtu | naprawiony | |
TypCiągu | wyliczenie | |
Typ dziesiętny | B | dziesiętny |
TypZnacznikaCzasu | długi | znacznik czasu-w milisekundach |
Przykłady
W tych przykładach użyto pliku episodes.avro .
Skala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
W tym przykładzie pokazano niestandardowy schemat Avro:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
W tym przykładzie przedstawiono opcje kompresji Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
W tym przykładzie pokazano partycjonowane rekordy Avro:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
W tym przykładzie przedstawiono nazwę rekordu i przestrzeń nazw:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Pyton
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Aby wysłać zapytanie do danych Avro w programie SQL, zarejestruj plik danych jako tabelę lub widok tymczasowy:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Przykład notatnika: odczytywanie i zapisywanie plików Avro
W poniższym notesie pokazano, jak odczytywać i zapisywać pliki Avro.