Sdílet prostřednictvím


Běžné vzory načítání dat

Automatický zavaděč zjednodušuje řadu běžných úloh příjmu dat. Tato rychlá referenční příručka obsahuje příklady pro několik oblíbených vzorů.

Filtrování adresářů nebo souborů pomocí vzorů globu

Vzory globu lze použít pro filtrování adresářů a souborů, pokud jsou v cestě k dispozici.

Vzor Popis
? Odpovídá jakémukoli jednomu znaku.
* Odpovídá nule nebo více znaků
[abc] Odpovídá jednomu znaku ze znakové sady {a,b,c}.
[a-z] Odpovídá jednomu znaku z rozsahu znaků {a... z}.
[^a] Odpovídá jednomu znaku, který není ze znakové sady nebo rozsahu {a}. Všimněte si, že ^ znak musí nastat okamžitě napravo od levé závorky.
{ab,cd} Odpovídá řetězci ze sady řetězců {ab, cd}.
{ab,c{de, fh}} Odpovídá řetězci ze sady řetězců {ab, cde, cfh}.

path Použijte k poskytování vzorů předpon, například:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Důležité

Musíte použít možnost pathGlobFilter explicitního poskytování vzorů přípon. Jediný path filtr předpony.

Pokud například chcete parsovat jenom png soubory v adresáři, který obsahuje soubory s různými příponami, můžete:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Poznámka:

Výchozí chování automatického zavaděče se liší od výchozího chování jiných zdrojů souborů Spark. Přidejte .option("cloudFiles.useStrictGlobber", "true") do čtení, abyste použili globbing, který odpovídá výchozímu chování Sparku vůči zdrojům souborů. Další informace o globbingu najdete v následující tabulce:

Vzor Cesta k souboru Výchozí globber Striktní globber
/a/b /a/b/c/file.txt Ano Ano
/a/b /a/b_dir/c/file.txt Ne Ne
/a/b /a/b.txt Ne Ne
/a/b/ /a/b.txt Ne Ne
/a/*/c/ /a/b/c/file.txt Ano Ano
/a/*/c/ /a/b/c/d/file.txt Ano Ano
/a/*/c/ /a/b/x/y/c/file.txt Ano Ne
/a/*/c /a/b/c_file.txt Ano Ne
/a/*/c/ /a/b/c_file.txt Ano Ne
/a/*/c/ /a/*/cookie/file.txt Ano Ne
/a/b* /a/b.txt Ano Ano
/a/b* /a/b/file.txt Ano Ano
/a/{0.txt,1.txt} /a/0.txt Ano Ano
/a/*/{0.txt,1.txt} /a/0.txt Ne Ne
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Ano Ano

Povolení snadného ETL

Snadný způsob, jak dostat data do Delta Lake bez ztráty dat, je použít následující vzor a povolit odvozování schématu pomocí automatického zavaděče. Databricks doporučuje spustit následující kód v úloze Azure Databricks, aby automaticky restartoval datový proud, když se změní schéma zdrojových dat. Ve výchozím nastavení se schéma odvodí jako typy řetězců, všechny chyby analýzy (pokud vše zůstane jako řetězec) přejde na _rescued_dataa všechny nové sloupce selžou datový proud a vyvíjejí schéma.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Prevence ztráty dat ve dobře strukturovaných datech

Když znáte schéma, ale chcete vědět, kdykoli obdržíte neočekávaná data, databricks doporučuje použít rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Pokud chcete, aby stream přestal zpracovávat, pokud se zavádí nové pole, které neodpovídá vašemu schématu, můžete přidat:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Povolení flexibilních částečně strukturovaných datových kanálů

Když dostáváte data od dodavatele, který do informací, které poskytují, zavádí nové sloupce, nemusíte vědět přesně, kdy to dělají, nebo možná nebudete mít šířku pásma pro aktualizaci datového kanálu. Teď můžete využít vývoj schématu k restartování datového proudu a nechat automatické zavaděče aktualizovat odvozené schéma automaticky. Můžete také využít schemaHints některá pole bez schématu, která může dodavatel poskytnout.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformace vnořených dat JSON

Vzhledem k tomu, že automatický zavaděč odvodí sloupce JSON nejvyšší úrovně jako řetězce, můžete nechat vnořené objekty JSON, které vyžadují další transformace. Pomocí částečně strukturovaných rozhraní API pro přístup k datům můžete dále transformovat složitý obsah JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Odvození vnořených dat JSON

Pokud máte vnořená data, můžete použít cloudFiles.inferColumnTypes možnost odvození vnořené struktury dat a dalších typů sloupců.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Načtení souborů CSV bez hlaviček

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Vynucení schématu u souborů CSV s hlavičkami

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingestování obrázků nebo binárních dat do Delta Lake pro ML

Jakmile jsou data uložená v Delta Lake, můžete na datech spustit distribuované odvozování. Viz Provádění distribuovaných odvozování pomocí funkce UDF pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Syntaxe automatického zavaděče pro DLT

Delta Live Tables poskytuje mírně upravenou syntaxi Pythonu pro automatické zavaděče přidává podporu SQL pro automatický zavaděč.

Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

U automatického zavaděče můžete použít podporované možnosti formátování. map() Pomocí funkce můžete metodě předat možnosticloud_files(). Možnosti jsou páry klíč-hodnota, kde klíče a hodnoty jsou řetězce. Následující popis syntaxe pro práci s automatickým zavaděčem v SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Následující příklad načte data ze souborů CSV oddělených tabulátory s hlavičkou:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Formát můžete schema zadat ručně. Musíte zadat schema formáty, které nepodporují odvozování schématu:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Poznámka:

Delta Live Tables automaticky konfiguruje a spravuje adresáře schématu a kontrolních bodů při použití automatického zavaděče ke čtení souborů. Pokud však ručně nakonfigurujete některý z těchto adresářů, provedení úplné aktualizace nemá vliv na obsah nakonfigurovaných adresářů. Databricks doporučuje používat automaticky nakonfigurované adresáře, aby se zabránilo neočekávaným vedlejším účinkům během zpracování.