Udostępnij za pośrednictwem


Typowe wzorce ładowania danych

Moduł automatycznego ładowania upraszcza szereg typowych zadań pozyskiwania danych. Ta szybka dokumentacja zawiera przykłady dla kilku popularnych wzorców.

Filtrowanie katalogów lub plików przy użyciu wzorców glob

Wzorce globu mogą służyć do filtrowania katalogów i plików, jeśli podano w ścieżce.

Wzorzec opis
? Pasuje do dowolnego pojedynczego znaku
* Dopasuje zero lub więcej znaków
[abc] Dopasuje pojedynczy znak z zestawu znaków {a,b,c}.
[a-z] Dopasuje pojedynczy znak z zakresu znaków {a... z}.
[^a] Dopasuje pojedynczy znak, który nie pochodzi z zestawu znaków lub zakresu {a}. Należy pamiętać, że ^ znak musi występować natychmiast po prawej stronie nawiasu otwierającego.
{ab,cd} Dopasuje ciąg z zestawu ciągów {ab, cd}.
{ab,c{de, fh}} Dopasuje ciąg z zestawu ciągów {ab, cde, cfh}.

Użyj elementu path do udostępniania wzorców prefiksów, na przykład:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Ważne

Należy użyć opcji pathGlobFilter jawnego udostępniania wzorców sufiksów. Jedyną path wartością jest filtr prefiksu.

Jeśli na przykład chcesz przeanalizować tylko png pliki w katalogu zawierającym pliki z różnymi sufiksami, możesz wykonać następujące czynności:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Uwaga

Domyślne zachowanie funkcji automatycznego ładowania globbingowego różni się od domyślnego zachowania innych źródeł plików platformy Spark. Dodaj .option("cloudFiles.useStrictGlobber", "true") element do odczytu, aby użyć funkcji globbing, która jest zgodna z domyślnym zachowaniem platformy Spark względem źródeł plików. Zobacz następującą tabelę, aby uzyskać więcej informacji na temat tworzenia symboli globbingowych:

Wzorzec Ścieżka pliku Domyślny globber Surowy globber
/a/b /a/b/c/file.txt Tak Tak
/a/b /a/b_dir/c/file.txt Nie Nie
/a/b /a/b.txt Nie Nie
/a/b/ /a/b.txt Nie Nie
/a/*/c/ /a/b/c/file.txt Tak Tak
/a/*/c/ /a/b/c/d/file.txt Tak Tak
/a/*/c/ /a/b/x/y/c/file.txt Tak Nie
/a/*/c /a/b/c_file.txt Tak Nie
/a/*/c/ /a/b/c_file.txt Tak Nie
/a/*/c/ /a/*/cookie/file.txt Tak Nie
/a/b* /a/b.txt Tak Tak
/a/b* /a/b/file.txt Tak Tak
/a/{0.txt,1.txt} /a/0.txt Tak Tak
/a/*/{0.txt,1.txt} /a/0.txt Nie Nie
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Tak Tak

Włączanie łatwego etl

Łatwym sposobem uzyskania danych do usługi Delta Lake bez utraty danych jest użycie następującego wzorca i włączenie wnioskowania schematu za pomocą modułu ładującego automatycznego. Usługa Databricks zaleca uruchomienie następującego kodu w zadaniu usługi Azure Databricks w celu automatycznego ponownego uruchomienia strumienia po zmianie schematu danych źródłowych. Domyślnie schemat jest wywnioskowany jako typy ciągów, wszelkie błędy analizowania (nie powinno być żadnych, jeśli wszystko pozostanie jako ciąg) przejdzie do _rescued_data, a wszystkie nowe kolumny nie powiedzą się strumieniu i ewoluują schemat.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Zapobieganie utracie danych w dobrze ustrukturyzowanych danych

Jeśli znasz schemat, ale chcesz wiedzieć, kiedy otrzymujesz nieoczekiwane dane, usługa Databricks zaleca użycie polecenia rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jeśli chcesz, aby strumień przestał przetwarzać, jeśli wprowadzono nowe pole, które nie jest zgodne ze schematem, możesz dodać:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Włączanie elastycznych potoków danych częściowo ustrukturyzowanych

Gdy otrzymujesz dane od dostawcy, który wprowadza nowe kolumny do podanych informacji, możesz nie być świadomy dokładnie tego, kiedy to robią, lub nie masz przepustowości aktualizacji potoku danych. Teraz możesz użyć ewolucji schematu, aby ponownie uruchomić strumień i zezwolić automatycznemu modułowi ładującego zaktualizować wywnioskowany schemat. Możesz również skorzystać schemaHints z niektórych pól "bez schematu", które dostawca może dostarczać.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Przekształcanie zagnieżdżonych danych JSON

Ponieważ moduł automatycznego ładowania wywnioskuje kolumny JSON najwyższego poziomu jako ciągi, można pozostawić z zagnieżdżonych obiektów JSON, które wymagają dalszych przekształceń. Interfejsy API dostępu do danych częściowo ustrukturyzowanych umożliwiają dalsze przekształcanie złożonej zawartości JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Wnioskowanie zagnieżdżonych danych JSON

W przypadku zagnieżdżenia danych można użyć cloudFiles.inferColumnTypes opcji wnioskowania zagnieżdżonej struktury danych i innych typów kolumn.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Ładowanie plików CSV bez nagłówków

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Wymuszanie schematu w plikach CSV z nagłówkami

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Pozyskiwanie obrazów lub danych binarnych do usługi Delta Lake dla uczenia maszynowego

Gdy dane są przechowywane w usłudze Delta Lake, możesz uruchomić rozproszone wnioskowanie na danych. Zobacz Wykonywanie wnioskowania rozproszonego przy użyciu funkcji zdefiniowanej przez użytkownika biblioteki pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Składnia automatycznego modułu ładującego dla biblioteki DLT

Funkcja Delta Live Tables zapewnia nieco zmodyfikowaną składnię języka Python dla automatycznego modułu ładującego dodaje obsługę języka SQL dla modułu ładującego automatycznego.

W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map() Za pomocą funkcji można przekazać opcje do cloud_files() metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Poniższy przykład odczytuje dane z plików CSV rozdzielanych tabulatorami z nagłówkiem:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Możesz użyć schema polecenia , aby ręcznie określić format. Należy określić schema format dla formatów, które nie obsługują wnioskowania schematu:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Uwaga

Delta Live Tables automatycznie konfiguruje schemat i katalogi punktów kontrolnych oraz zarządza nimi podczas używania automatycznego modułu ładującego do odczytywania plików. Jeśli jednak ręcznie skonfigurujesz jeden z tych katalogów, wykonywanie pełnego odświeżania nie ma wpływu na zawartość skonfigurowanych katalogów. Usługa Databricks zaleca używanie automatycznie skonfigurowanych katalogów, aby uniknąć nieoczekiwanych skutków ubocznych podczas przetwarzania.