Udostępnij za pośrednictwem


Typowe wzorce ładowania danych

Moduł automatycznego ładowania upraszcza szereg typowych zadań pozyskiwania danych. Ten szybki przewodnik zawiera przykłady kilku popularnych wzorców.

Filtrowanie katalogów lub plików przy użyciu wzorców glob

Wzorce globu mogą służyć do filtrowania katalogów i plików, gdy są podane w ścieżce.

Wzorzec opis
? Pasuje do dowolnego pojedynczego znaku
* Dopasowuje zero lub więcej znaków
[abc] Dopasuje pojedynczy znak z zestawu znaków {a,b,c}.
[a-z] Dopasuje pojedynczy znak z zakresu znaków {a... z}.
[^a] Dopasuje pojedynczy znak, który nie pochodzi z zestawu znaków lub zakresu {a}. Należy pamiętać, że ^ znak musi występować natychmiast po prawej stronie nawiasu otwierającego.
{ab,cd} Dopasuje ciąg z zestawu ciągów {ab, cd}.
{ab,c{de, fh}} Dopasuje ciąg z zestawu ciągów {ab, cde, cfh}.

Użyj elementu path do udostępniania wzorców prefiksów, na przykład:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Skala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Ważne

Należy użyć opcji pathGlobFilter jawnego udostępniania wzorców sufiksów. path zapewnia jedynie filtr prefiksu.

Jeśli na przykład chcesz przeanalizować tylko png pliki w katalogu zawierającym pliki z różnymi sufiksami, możesz wykonać następujące czynności:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Skala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Uwaga

Domyślne zachowanie funkcji Auto Loader opartej na globbing różni się od domyślnego zachowania innych źródeł plików Spark. Dodaj .option("cloudFiles.useStrictGlobber", "true") do odczytu, aby użyć funkcji globbing, która jest zgodna z domyślnym zachowaniem Spark względem źródeł plików. Zobacz następującą tabelę, aby uzyskać więcej informacji na temat globbingu:

Wzorzec Ścieżka pliku Domyślny globber Surowy globber
/a/b /a/b/c/file.txt Tak Tak
/a/b /a/b_dir/c/file.txt Nie Nie
/a/b /a/b.txt Nie Nie
/a/b/ /a/b.txt Nie Nie
/a/*/c/ /a/b/c/file.txt Tak Tak
/a/*/c/ /a/b/c/d/file.txt Tak Tak
/a/*/c/ /a/b/x/y/c/file.txt Tak Nie
"/a/*/c" /a/b/c_file.txt Tak Nie
/a/*/c/ /a/b/c_file.txt Tak Nie
/a/*/c/ /a/*/cookie/file.txt Tak Nie
/a/b* /a/b.txt Tak Tak
/a/b* /a/b/file.txt Tak Tak
/a/{0.txt,1.txt} /a/0.txt Tak Tak
/a/*/{0.txt,1.txt} /a/0.txt Nie Nie
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Tak Tak

Włączyć łatwe ETL

Łatwym sposobem wprowadzenia danych do Delta Lake bez utraty danych jest zastosowanie następującego wzorca i włączenie wnioskowania schematu za pomocą Auto Loader. Usługa Databricks zaleca uruchomienie następującego kodu w zadaniu usługi Azure Databricks w celu automatycznego ponownego uruchomienia strumienia po zmianie schematu danych źródłowych. Domyślnie schemat jest wywnioskowany jako typy łańcuchowe, wszelkie błędy analizowania (nie powinno ich być, jeżeli wszystko pozostanie jako łańcuch) zostaną skierowane do _rescued_data, a wszelkie nowe kolumny spowodują awarię strumienia i zmienią schemat.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Skala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Zapobieganie utracie danych w dobrze ustrukturyzowanych danych

Gdy znasz schemat, ale chcesz wiedzieć, kiedy otrzymujesz nieoczekiwane dane, usługa Databricks zaleca użycie rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Skala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jeśli chcesz, aby strumień przestał przetwarzać, jeśli wprowadzono nowe pole, które nie jest zgodne ze schematem, możesz dodać:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Umożliwienie elastycznych potoków danych częściowo ustrukturyzowanych

Gdy otrzymujesz dane od dostawcy, który wprowadza nowe kolumny do przekazywanych informacji, możesz nie być świadomy dokładnie, kiedy to robią, lub możesz nie mieć możliwości zaktualizowania swojego potoku danych. Teraz możesz użyć ewolucji schematu, aby ponownie uruchomić strumień i zezwolić automatycznemu modułowi ładującego zaktualizować wywnioskowany schemat. Możesz również skorzystać z schemaHints przy korzystaniu z niektórych nieschematycznych pól, które dostawca może udostępniać.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Skala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Przekształcanie zagnieżdżonych danych JSON

Ponieważ Auto Loader interpretuje kolumny JSON najwyższego poziomu jako ciągi znaków, mogą pozostać zagnieżdżone obiekty JSON, które trzeba dalej przekształcić. Można użyć interfejsów API dostępu do danych częściowo ustrukturyzowanych, aby dalej przekształcać złożoną zawartość JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Skala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Wnioskowanie zagnieżdżonych danych JSON

W przypadku zagnieżdżenia danych można użyć opcji cloudFiles.inferColumnTypes, aby wywnioskować zagnieżdżoną strukturę danych oraz inne typy kolumn.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Skala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Ładowanie plików CSV bez nagłówków

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Skala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Wymuszanie schematu w plikach CSV z nagłówkami

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Skala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Wprowadzanie obrazów lub danych binarnych do Delta Lake na potrzeby uczenia maszynowego

Gdy dane są przechowywane w usłudze Delta Lake, możesz uruchomić rozproszone wnioskowanie na danych. Zobacz Wykonywanie wnioskowania rozproszonego przy użyciu funkcji zdefiniowanych przez użytkownika (UDF) w bibliotece pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Skala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Składnia modułu automatycznego ładownia dla deklaratywnych potoków Lakeflow

Lakeflow Declarative Pipelines zapewnia nieco zmodyfikowaną składnię języka Python dla Auto Loader i dodaje obsługę SQL dla Auto Loader.

W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

Dla modułu ładującego automatycznego można użyć obsługiwanych opcji formatowania. Opcje dla read_files są parami klucz-wartość. Aby uzyskać szczegółowe informacje na temat obsługiwanych formatów i opcji, zobacz opcje .

Na przykład:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

Poniższy przykład odczytuje dane z plików CSV rozdzielanych tabulatorami z nagłówkiem:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

Możesz użyć schema, aby ręcznie określić format; Należy określić schema dla formatów, które nie obsługują wnioskowania schematu :

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

Uwaga

Potoki deklaratywne Lakeflow automatycznie konfigurują i zarządzają katalogami schematu i punktów kontrolnych podczas korzystania z Auto Loader do odczytywania plików. Jeśli jednak ręcznie skonfigurujesz jeden z tych katalogów, wykonywanie pełnego odświeżania nie ma wpływu na zawartość skonfigurowanych katalogów. Usługa Databricks zaleca używanie automatycznie skonfigurowanych katalogów, aby uniknąć nieoczekiwanych skutków ubocznych podczas przetwarzania.