Gyakori adatbetöltési minták
Az Automatikus betöltő számos gyakori adatbetöltési feladatot egyszerűsít. Ez a rövid útmutató számos népszerű mintára mutat be példákat.
Könyvtárak vagy fájlok szűrése glob mintákkal
A Glob-minták a címtárak és fájlok szűréséhez használhatók, ha az elérési úton meg van adva.
Minta | Leírás |
---|---|
? |
Egyetlen karakternek felel meg |
* |
Nulla vagy több karakter egyezése |
[abc] |
Egyetlen karakternek felel meg a(z) {a,b,c} karakterkészletből. |
[a-z] |
Egyetlen karaktert egyezik a(z) {a... karaktertartományból z}. |
[^a] |
Egyetlen karaktert egyezik meg, amely nem a(z) {a} karakterkészletből vagy tartományból származik. Vegye figyelembe, hogy a ^ karakternek közvetlenül a nyitó zárójel jobb oldalán kell lennie. |
{ab,cd} |
Megfelel a(z) {ab, cd} sztringkészlet egyik sztringjének. |
{ab,c{de, fh}} |
Megfelel a(z) {ab, cde, cfh} sztringhalmazból származó sztringnek. |
Használja az path
előtagmintákat, például:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Fontos
Az utótagminták explicit megadására szolgáló lehetőséget pathGlobFilter
kell használnia. Az path
egyetlen előtagszűrő.
Ha például csak png
olyan fájlokat szeretne elemezni egy könyvtárban, amely különböző utótagokkal rendelkező fájlokat tartalmaz, a következőket teheti:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Feljegyzés
Az automatikus betöltő alapértelmezett zúgási viselkedése eltér a többi Spark-fájlforrás alapértelmezett viselkedésétől. Adja hozzá .option("cloudFiles.useStrictGlobber", "true")
az olvasáshoz az alapértelmezett Spark-viselkedésnek megfelelő globbingot a fájlforrásokhoz. A globbingról az alábbi táblázatban talál további információt:
Minta | Fájl elérési útja | Alapértelmezett globber | Szigorú globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Igen | Igen |
/a/b | /a/b_dir/c/file.txt | Nem | Nem |
/a/b | /a/b.txt | Nem | Nem |
/a/b/ | /a/b.txt | Nem | Nem |
/a/*/c/ | /a/b/c/file.txt | Igen | Igen |
/a/*/c/ | /a/b/c/d/file.txt | Igen | Igen |
/a/*/c/ | /a/b/x/y/c/file.txt | Igen | Nem |
/a/*/c | /a/b/c_file.txt | Igen | Nem |
/a/*/c/ | /a/b/c_file.txt | Igen | Nem |
/a/*/c/ | /a/*/cookie/file.txt | Igen | Nem |
/a/b* | /a/b.txt | Igen | Igen |
/a/b* | /a/b/file.txt | Igen | Igen |
/a/{0.txt,1.txt} | /a/0.txt | Igen | Igen |
/a/*/{0.txt,1.txt} | /a/0.txt | Nem | Nem |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Igen | Igen |
Egyszerű ETL engedélyezése
Az adatok a Delta Lake-be való beolvasásának egyszerű módja az alábbi minta használata és a sémakövetkezés engedélyezése az Automatikus betöltővel. A Databricks azt javasolja, hogy futtassa az alábbi kódot egy Azure Databricks-feladatban, hogy automatikusan újraindítsa a streamet, amikor a forrásadatok sémája megváltozik. Alapértelmezés szerint a séma sztringtípusokként van kikövetkeztetve, minden elemzési hiba (ha minden sztringként marad) a rendszer el fog indulni _rescued_data
, és minden új oszlop meghibásodik a streamben, és fejleszti a sémát.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Adatvesztés megakadályozása jól strukturált adatokban
Ha ismeri a sémát, de szeretné tudni, hogy mikor kap váratlan adatokat, a Databricks a .rescuedDataColumn
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Ha azt szeretné, hogy a stream leálljon a feldolgozással, ha olyan új mező van beállítva, amely nem felel meg a sémának, hozzáadhatja a következőt:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Rugalmas, félig strukturált adatfolyamok engedélyezése
Ha olyan szállítótól kap adatokat, amelyek új oszlopokat vezetnek be az általuk megadott információkhoz, előfordulhat, hogy nem tudja pontosan, hogy mikor teszik meg, vagy előfordulhat, hogy nem rendelkezik az adatfolyam frissítéséhez szükséges sávszélességgel. Most már használhatja a sémafejlődést a stream újraindításához, és engedélyezheti, hogy az Automatikus betöltő automatikusan frissítse a következtetett sémát. A szállító által megadott "séma nélküli" mezők némelyikét is használhatja schemaHints
.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Beágyazott JSON-adatok átalakítása
Mivel az Automatikus betöltő sztringként a legfelső szintű JSON-oszlopokat keresi, a további átalakításokat igénylő beágyazott JSON-objektumokkal is maradhat. A félig strukturált adatelérési API-k segítségével tovább alakíthatja az összetett JSON-tartalmakat.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Beágyazott JSON-adatok következtetése
Ha beágyazott adatokkal rendelkezik, azzal a cloudFiles.inferColumnTypes
lehetőséggel következtethet az adatok és más oszloptípusok beágyazott szerkezetére.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
CSV-fájlok betöltése fejlécek nélkül
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Séma kényszerítése fejlécekkel rendelkező CSV-fájlokon
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Kép- vagy bináris adatok betöltése a Delta Lake for ML-be
Miután az adatokat a Delta Lake-ben tárolta, elosztott következtetést futtathat az adatokon. Lásd: Elosztott következtetés végrehajtása pandas UDF használatával.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
A DLT automatikus betöltő szintaxisa
A Delta Live Tables kissé módosított Python-szintaxist biztosít az Automatikus betöltőhöz, és sql-támogatást ad hozzá az automatikus betöltőhöz.
Az alábbi példák az Automatikus betöltő használatával hoznak létre adatkészleteket CSV- és JSON-fájlokból:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Az Automatikus betöltővel támogatott formátumbeállításokat használhat. A map()
függvény használatával megadhat beállításokat a cloud_files()
metódusnak. A beállítások kulcs-érték párok, ahol a kulcsok és az értékek sztringek. Az alábbiak az automatikus betöltő SQL-ben való használatának szintaxisát ismertetik:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Az alábbi példa tabulátorral tagolt CSV-fájlokból olvas be adatokat fejléccel:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
A formátumot manuálisan is megadhatjaschema
. Meg kell adnia azokat a formátumokat, amelyek nem támogatják a schema
sémakövetődést:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Feljegyzés
A Delta Live Tables automatikusan konfigurálja és kezeli a sémát és az ellenőrzőpont-könyvtárakat az Automatikus betöltő használatával a fájlok olvasásához. Ha azonban manuálisan konfigurálja valamelyik könyvtárat, a teljes frissítés végrehajtása nem befolyásolja a konfigurált könyvtárak tartalmát. A Databricks az automatikusan konfigurált címtárak használatát javasolja a váratlan mellékhatások elkerülése érdekében a feldolgozás során.