Běžné vzory načítání dat
Automatický zavaděč zjednodušuje řadu běžných úloh příjmu dat. Tato rychlá referenční příručka obsahuje příklady pro několik oblíbených vzorů.
Filtrování adresářů nebo souborů pomocí vzorů globu
Vzory globu lze použít pro filtrování adresářů a souborů, pokud jsou v cestě k dispozici.
Vzor | Popis |
---|---|
? |
Odpovídá jakémukoli jednomu znaku. |
* |
Odpovídá nule nebo více znaků |
[abc] |
Odpovídá jednomu znaku ze znakové sady {a,b,c}. |
[a-z] |
Odpovídá jednomu znaku z rozsahu znaků {a... z}. |
[^a] |
Odpovídá jednomu znaku, který není ze znakové sady nebo rozsahu {a}. Všimněte si, že ^ znak musí nastat okamžitě napravo od levé závorky. |
{ab,cd} |
Odpovídá řetězci ze sady řetězců {ab, cd}. |
{ab,c{de, fh}} |
Odpovídá řetězci ze sady řetězců {ab, cde, cfh}. |
path
Použijte k poskytování vzorů předpon, například:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Důležité
Musíte použít možnost pathGlobFilter
explicitního poskytování vzorů přípon. Jediný path
filtr předpony.
Pokud například chcete parsovat jenom png
soubory v adresáři, který obsahuje soubory s různými příponami, můžete:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Poznámka:
Výchozí chování automatického zavaděče se liší od výchozího chování jiných zdrojů souborů Spark. Přidejte .option("cloudFiles.useStrictGlobber", "true")
do čtení, abyste použili globbing, který odpovídá výchozímu chování Sparku vůči zdrojům souborů. Další informace o globbingu najdete v následující tabulce:
Vzor | Cesta k souboru | Výchozí globber | Striktní globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Ano | Ano |
/a/b | /a/b_dir/c/file.txt | Ne | Ne |
/a/b | /a/b.txt | Ne | Ne |
/a/b/ | /a/b.txt | Ne | Ne |
/a/*/c/ | /a/b/c/file.txt | Ano | Ano |
/a/*/c/ | /a/b/c/d/file.txt | Ano | Ano |
/a/*/c/ | /a/b/x/y/c/file.txt | Ano | Ne |
/a/*/c | /a/b/c_file.txt | Ano | Ne |
/a/*/c/ | /a/b/c_file.txt | Ano | Ne |
/a/*/c/ | /a/*/cookie/file.txt | Ano | Ne |
/a/b* | /a/b.txt | Ano | Ano |
/a/b* | /a/b/file.txt | Ano | Ano |
/a/{0.txt,1.txt} | /a/0.txt | Ano | Ano |
/a/*/{0.txt,1.txt} | /a/0.txt | Ne | Ne |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Ano | Ano |
Povolení snadného ETL
Snadný způsob, jak dostat data do Delta Lake bez ztráty dat, je použít následující vzor a povolit odvozování schématu pomocí automatického zavaděče. Databricks doporučuje spustit následující kód v úloze Azure Databricks, aby automaticky restartoval datový proud, když se změní schéma zdrojových dat. Ve výchozím nastavení se schéma odvodí jako typy řetězců, všechny chyby analýzy (pokud vše zůstane jako řetězec) přejde na _rescued_data
a všechny nové sloupce selžou datový proud a vyvíjejí schéma.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Prevence ztráty dat ve dobře strukturovaných datech
Když znáte schéma, ale chcete vědět, kdykoli obdržíte neočekávaná data, databricks doporučuje použít rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Pokud chcete, aby stream přestal zpracovávat, pokud se zavádí nové pole, které neodpovídá vašemu schématu, můžete přidat:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Povolení flexibilních částečně strukturovaných datových kanálů
Když dostáváte data od dodavatele, který do informací, které poskytují, zavádí nové sloupce, nemusíte vědět přesně, kdy to dělají, nebo možná nebudete mít šířku pásma pro aktualizaci datového kanálu. Teď můžete využít vývoj schématu k restartování datového proudu a nechat automatické zavaděče aktualizovat odvozené schéma automaticky. Můžete také využít schemaHints
některá pole bez schématu, která může dodavatel poskytnout.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformace vnořených dat JSON
Vzhledem k tomu, že automatický zavaděč odvodí sloupce JSON nejvyšší úrovně jako řetězce, můžete nechat vnořené objekty JSON, které vyžadují další transformace. Pomocí částečně strukturovaných rozhraní API pro přístup k datům můžete dále transformovat složitý obsah JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Odvození vnořených dat JSON
Pokud máte vnořená data, můžete použít cloudFiles.inferColumnTypes
možnost odvození vnořené struktury dat a dalších typů sloupců.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Načtení souborů CSV bez hlaviček
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Vynucení schématu u souborů CSV s hlavičkami
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingestování obrázků nebo binárních dat do Delta Lake pro ML
Jakmile jsou data uložená v Delta Lake, můžete na datech spustit distribuované odvozování. Viz Provádění distribuovaných odvozování pomocí funkce UDF pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Syntaxe automatického zavaděče pro DLT
Delta Live Tables poskytuje mírně upravenou syntaxi Pythonu pro automatické zavaděče přidává podporu SQL pro automatický zavaděč.
Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
U automatického zavaděče můžete použít podporované možnosti formátování. map()
Pomocí funkce můžete metodě předat možnosticloud_files()
. Možnosti jsou páry klíč-hodnota, kde klíče a hodnoty jsou řetězce. Následující popis syntaxe pro práci s automatickým zavaděčem v SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Následující příklad načte data ze souborů CSV oddělených tabulátory s hlavičkou:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Formát můžete schema
zadat ručně. Musíte zadat schema
formáty, které nepodporují odvozování schématu:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Poznámka:
Delta Live Tables automaticky konfiguruje a spravuje adresáře schématu a kontrolních bodů při použití automatického zavaděče ke čtení souborů. Pokud však ručně nakonfigurujete některý z těchto adresářů, provedení úplné aktualizace nemá vliv na obsah nakonfigurovaných adresářů. Databricks doporučuje používat automaticky nakonfigurované adresáře, aby se zabránilo neočekávaným vedlejším účinkům během zpracování.
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro