A séma következtetés és evolúció beállítása az automatikus betöltőben

Az Automatikus betöltőt úgy konfigurálhatja, hogy automatikusan észlelje a betöltött adatok sémáját, így anélkül inicializálhatja a táblákat, hogy explicit módon deklarálja az adatsémát, és új oszlopok bevezetésekor fejleszti a táblázatsémát. Ez szükségtelenné teszi a sémamódosítások manuális nyomon követését és alkalmazását az idő függvényében.

Az Automatikus betöltő egy JSON-bloboszlopban váratlanul (például eltérő adattípusokból) álló adatokat is "menthet", amelyeket később a félig strukturált adatelérési API-k használatával érhet el.

A sémakövetkeztetés és -fejlesztés a következő formátumokat támogatja:

Fájlformátum Támogatott verziók
JSON Az összes verzió
CSV Az összes verzió
XML Databricks Runtime 14.3 LTS és újabb verziók
Avro Databricks Runtime 10.4 LTS és újabb
Parquet Databricks Runtime 11.3 LTS és újabb verziók
ORC Támogatott
Text Nem alkalmazható (rögzített séma)
Binaryfile Nem alkalmazható (rögzített séma)

Sémakövetkeztetés és -fejlesztés szintaxisa

A cloudFiles.schemaLocation opció célkönyvtárának megadása lehetővé teszi a séma következtetését és fejlesztését. Választhatja, hogy ugyanazt a könyvtárat használja, amelyet a checkpointLocation. Delta Live Tables használata esetén az Azure Databricks automatikusan kezeli a séma helyét és egyéb ellenőrzőpont-információkat.

Feljegyzés

Ha egynél több forrásadathely van betöltve a céltáblába, minden automatikus betöltési számítási feladathoz külön streamelési ellenőrzőpont szükséges.

Az alábbi példa a parquetcloudFiles.format. Használjon csv, avrovagy json más fájlforrásokat. Minden más olvasási és írási beállítás változatlan marad az egyes formátumok alapértelmezett viselkedéséhez.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Hogyan működik az automatikus betöltő sémakövető következtetése?

Az adatok első olvasásakor a séma következtetéséhez az Automatikus betöltő az első felderített 50 GB-os vagy 1000-fájlokat mintázta, attól függően, hogy melyik korlát lépi át először a korlátot. Az Automatikus betöltő a sémaadatokat a konfigurált cloudFiles.schemaLocation könyvtárban _schemas tárolja a bemeneti adatok sémaváltozásainak nyomon követésére.

Feljegyzés

A használt minta méretének módosításához beállíthatja az SQL-konfigurációkat:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(bájtsztring, például 10gb)

és

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(egész szám)

Az automatikus betöltő sémakövetkeztetése alapértelmezés szerint a típuseltérések miatti sémafejlődési problémák elkerülésére törekszik. Olyan formátumok esetén, amelyek nem kódolnak adattípusokat (JSON, CSV és XML), az Automatikus betöltő az összes oszlopot sztringként (beleértve a JSON-fájlok beágyazott mezőit is) sztringként jelöli. A beírt sémával (Parquet és Avro) rendelkező formátumok esetében az Automatikus betöltő a fájlok egy részhalmazát mintázhatja, és egyesítheti az egyes fájlok sémáit. Ezt a viselkedést az alábbi táblázat foglalja össze:

Fájlformátum Alapértelmezett késleltetett adattípus
JSON Sztring
CSV Sztring
XML Sztring
Avro Az Avro-sémában kódolt típusok
Parquet Parquet-sémában kódolt típusok

Az Apache Spark DataFrameReader eltérő viselkedést használ a sémakövetkeztetésekhez, és mintaadatok alapján választja ki a JSON-, CSV- és XML-források oszlopainak adattípusait. Ha engedélyezni szeretné ezt a viselkedést az Automatikus betöltővel, állítsa a beállítást a következőre cloudFiles.inferColumnTypestrue: .

Feljegyzés

A CSV-adatok sémájának következtetésekor az Automatikus betöltő feltételezi, hogy a fájlok fejléceket tartalmaznak. Ha a CSV-fájlok nem tartalmaznak fejléceket, adja meg a lehetőséget .option("header", "false"). Az Automatikus betöltő emellett egyesíti a mintában szereplő összes fájl sémáját, hogy globális sémát állítson elő. Az automatikus betöltő ezután beolvassa az egyes fájlokat a fejléce alapján, és megfelelően elemzi a CSV-t.

Feljegyzés

Ha egy oszlop két Parquet-fájlban eltérő adattípussal rendelkezik, az Automatikus betöltő a legszélesebb típust választja. A sémahiba használatával felülbírálhatja ezt a lehetőséget. Ha sématippeket ad meg, az Automatikus betöltő nem a megadott típusra irányítja az oszlopot, hanem arra utasítja a Parquet-olvasót, hogy a megadott típusként olvassa be az oszlopot. Eltérés esetén az oszlop mentése a mentett adatoszlopban történik.

Hogyan működik az automatikus betöltő sémafejlődése?

Az Automatikus betöltő észleli az új oszlopok hozzáadását az adatok feldolgozása során. Amikor az Automatikus betöltő új oszlopot észlel, a stream egy UnknownFieldException. Mielőtt a stream ezt a hibát észleli, az Automatikus betöltő sémakövetkeztetést hajt végre a legújabb mikro-adatkötegen, és frissíti a séma helyét a legújabb sémával az új oszlopoknak a séma végéhez való egyesítésével. A meglévő oszlopok adattípusai változatlanok maradnak.

A Databricks azt javasolja, hogy az automatikus betöltő streameket munkafolyamatokkal konfigurálja, hogy az automatikusan újrainduljon az ilyen sémamódosítások után.

Az Automatikus betöltő a sémafejlődés alábbi módjait támogatja, amelyeket a beállításban cloudFiles.schemaEvolutionModeállított be:

Mód Új oszlop olvasásának viselkedése
addNewColumns (alapértelmezett) A stream sikertelen. A rendszer új oszlopokat ad hozzá a sémához. A meglévő oszlopok nem fejlesztik az adattípusokat.
rescue A séma soha nem fejlődik, és a stream nem hiúsul meg sémamódosítások miatt. Minden új oszlop a mentett adatoszlopban lesz rögzítve.
failOnNewColumns A stream sikertelen. A stream csak akkor indul újra, ha a megadott séma frissül, vagy a jogsértő adatfájl el lesz távolítva.
none Nem fejleszti a sémát, az új oszlopok figyelmen kívül lesznek hagyva, és az adatok mentése csak akkor történik meg, ha a rescuedDataColumn beállítás be van állítva. A stream nem hiúsul meg sémamódosítások miatt.

Hogyan működnek a partíciók az Auto Loaderrel?

Az Automatikus betöltő megkísérel partícióoszlopokat kinyerni az adatok mögöttes könyvtárszerkezetéből, ha az adatok Hive-stílusú particionálásban találhatók. A fájl elérési útja base_path/event=click/date=2021-04-01/f0.json például a partícióoszlopok következtetését date eredményezi event . Ha a mögöttes címtárstruktúra ütköző Hive-partíciókat tartalmaz, vagy nem tartalmaz Hive-stílusú particionálást, a rendszer figyelmen kívül hagyja a partícióoszlopokat.

A bináris fájlok (binaryFile) és text a fájlformátumok rögzített adatsémákat tartalmaznak, de támogatják a partícióoszlopok következtetését. A Databricks javasolja ezeknek a fájlformátumoknak a beállítását cloudFiles.schemaLocation . Ezzel elkerülhetők a lehetséges hibák vagy információvesztések, és megelőzhető a partícióoszlopok következtetése minden alkalommal, amikor egy automatikus betöltő elindul.

A partícióoszlopok nem tekinthetők sémafejlődésnek. Ha volt egy kezdeti könyvtárstruktúra, például base_path/event=click/date=2021-04-01/f0.json, majd elkezd új fájlokat fogadni, base_path/event=click/date=2021-04-01/hour=01/f1.jsonaz Automatikus betöltő figyelmen kívül hagyja az óra oszlopot. Az új partícióoszlopok adatainak rögzítéséhez állítsa be a következőt cloudFiles.partitionColumnsevent,date,hour: .

Feljegyzés

A beállítás cloudFiles.partitionColumns az oszlopnevek vesszővel tagolt listáját használja. Csak a címtárstruktúrában párként key=value létező oszlopok lesznek elemezve.

Mi a mentett adatoszlop?

Amikor az Automatikus betöltő a sémára következtet, a rendszer automatikusan hozzáad egy mentett adatoszlopot a sémához._rescued_data A beállítás rescuedDataColumnbeállításával átnevezheti az oszlopot, vagy belefoglalhatja az olyan esetekbe, amikor sémát ad meg.

A mentett adatoszlop gondoskodik arról, hogy a sémával nem egyező oszlopok ne legyenek elvetve. A mentett adatoszlop az alábbi okokból nem elemezhető adatokat tartalmaz:

  • Az oszlop hiányzik a sémából.
  • Típuseltérések.
  • A kis- és nagybetűk eltérései.

A mentett adatoszlop tartalmaz egy JSON-t, amely tartalmazza a mentett oszlopokat és a rekord forrásfájljának elérési útját.

Feljegyzés

A JSON- és CSV-elemzők három módot támogatnak a rekordok elemzésekor: PERMISSIVE, DROPMALFORMEDés FAILFAST. Ha együtt rescuedDataColumnhasználják, az adattípus eltérései nem okoznak rekordokat módban, DROPMALFORMED és nem jeleznek hibát FAILFAST módban. A rendszer csak sérült rekordokat dob el, vagy hibákat jelez, például hiányos vagy hibásan formázott JSON- vagy CSV-rekordokat. Ha JSON vagy CSV elemzésekor használja badRecordsPath , az adattípus eltérései nem tekinthetők rossz rekordnak a rescuedDataColumnhasználata során. A rendszer csak hiányos és hibás JSON- vagy CSV-rekordokat tárol.badRecordsPath

A kis- és nagybetűk megkülönböztetett viselkedésének módosítása

Ha a kis- és nagybetűk érzékenysége nincs engedélyezve, az oszlopok abcAbc, és ABC a sémakövetkeztetés szempontjából ugyanazt az oszlopot tekintik. A kiválasztott eset tetszőleges, és a mintavételezett adatoktól függ. Sématippekkel kényszerítheti ki, hogy melyik esetet kell használnia. Miután kiválasztotta a kijelölt elemeket, és a séma kikövetkeztetett, az Automatikus betöltő nem veszi figyelembe a nem a sémával konzisztensen kiválasztott burkolatvariánsokat.

Ha a mentett adatoszlop engedélyezve van, a rendszer betölti az oszlopba a sématól _rescued_data eltérő esetben elnevezett mezőket. Módosítsa ezt a viselkedést úgy, hogy a beállítást readerCaseSensitive hamis értékre állítja, amely esetben az Automatikus betöltő kis- és nagybetűk érzéketlen módon olvassa be az adatokat.

Sémakövetkeztetés felülbírálása sématippekkel

Sématippekkel kikényszerítheti azokat a sémainformációkat, amelyeket egy következtetett sémában ismer és vár. Ha tudja, hogy egy oszlop egy adott adattípushoz tartozik, vagy ha általánosabb adattípust szeretne választani (például double egy helyett integer), tetszőleges számú tippet adhat meg az oszlop adattípusaihoz sztringként az SQL-séma specifikációjának szintaxisával, például az alábbiakkal:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

A támogatott adattípusok listájához tekintse meg az adattípusok dokumentációját.

Ha egy oszlop nem található a stream elején, sématippekkel is hozzáadhatja az oszlopot a kikövetkeztetett sémához.

Íme egy példa egy következtetett sémára, amely sématippekkel szemlélteti a viselkedést.

Következtetett séma:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

A következő sématippek megadásával:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

a következőt kapja:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Feljegyzés

A tömb- és térképséma-tippek támogatása a Databricks Runtime 9.1 LTS-ben és újabb verziókban érhető el.

Íme egy példa egy összetett adattípusokkal rendelkező, következtetett sémára, amely sématippekkel látja a viselkedést.

Következtetett séma:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

A következő sématippek megadásával:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

a következőt kapja:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Feljegyzés

A sémamutatók csak akkor használhatók, ha nem biztosít sémát az Automatikus betöltő számára. Használhat sématippeket, függetlenül attól, hogy engedélyezve vagy letiltva van-e cloudFiles.inferColumnTypes .