Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Moduł automatycznego ładowania upraszcza szereg typowych zadań pozyskiwania danych. Ten szybki przewodnik zawiera przykłady kilku popularnych wzorców.
Filtrowanie katalogów lub plików przy użyciu wzorców glob
Wzorce globu mogą służyć do filtrowania katalogów i plików, gdy są podane w ścieżce.
Wzorzec | opis |
---|---|
? |
Pasuje do dowolnego pojedynczego znaku |
* |
Dopasowuje zero lub więcej znaków |
[abc] |
Dopasuje pojedynczy znak z zestawu znaków {a,b,c}. |
[a-z] |
Dopasuje pojedynczy znak z zakresu znaków {a... z}. |
[^a] |
Dopasuje pojedynczy znak, który nie pochodzi z zestawu znaków lub zakresu {a}. Należy pamiętać, że ^ znak musi występować natychmiast po prawej stronie nawiasu otwierającego. |
{ab,cd} |
Dopasuje ciąg z zestawu ciągów {ab, cd}. |
{ab,c{de, fh}} |
Dopasuje ciąg z zestawu ciągów {ab, cde, cfh}. |
Użyj elementu path
do udostępniania wzorców prefiksów, na przykład:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Skala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Ważne
Należy użyć opcji pathGlobFilter
jawnego udostępniania wzorców sufiksów.
path
zapewnia jedynie filtr prefiksu.
Jeśli na przykład chcesz przeanalizować tylko png
pliki w katalogu zawierającym pliki z różnymi sufiksami, możesz wykonać następujące czynności:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Skala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Uwaga
Domyślne zachowanie funkcji Auto Loader opartej na globbing różni się od domyślnego zachowania innych źródeł plików Spark. Dodaj .option("cloudFiles.useStrictGlobber", "true")
do odczytu, aby użyć funkcji globbing, która jest zgodna z domyślnym zachowaniem Spark względem źródeł plików. Zobacz następującą tabelę, aby uzyskać więcej informacji na temat globbingu:
Wzorzec | Ścieżka pliku | Domyślny globber | Surowy globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Tak | Tak |
/a/b | /a/b_dir/c/file.txt | Nie | Nie |
/a/b | /a/b.txt | Nie | Nie |
/a/b/ | /a/b.txt | Nie | Nie |
/a/*/c/ | /a/b/c/file.txt | Tak | Tak |
/a/*/c/ | /a/b/c/d/file.txt | Tak | Tak |
/a/*/c/ | /a/b/x/y/c/file.txt | Tak | Nie |
"/a/*/c" | /a/b/c_file.txt | Tak | Nie |
/a/*/c/ | /a/b/c_file.txt | Tak | Nie |
/a/*/c/ | /a/*/cookie/file.txt | Tak | Nie |
/a/b* | /a/b.txt | Tak | Tak |
/a/b* | /a/b/file.txt | Tak | Tak |
/a/{0.txt,1.txt} | /a/0.txt | Tak | Tak |
/a/*/{0.txt,1.txt} | /a/0.txt | Nie | Nie |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Tak | Tak |
Włączyć łatwe ETL
Łatwym sposobem wprowadzenia danych do Delta Lake bez utraty danych jest zastosowanie następującego wzorca i włączenie wnioskowania schematu za pomocą Auto Loader. Usługa Databricks zaleca uruchomienie następującego kodu w zadaniu usługi Azure Databricks w celu automatycznego ponownego uruchomienia strumienia po zmianie schematu danych źródłowych. Domyślnie schemat jest wywnioskowany jako typy łańcuchowe, wszelkie błędy analizowania (nie powinno ich być, jeżeli wszystko pozostanie jako łańcuch) zostaną skierowane do _rescued_data
, a wszelkie nowe kolumny spowodują awarię strumienia i zmienią schemat.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Skala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Zapobieganie utracie danych w dobrze ustrukturyzowanych danych
Gdy znasz schemat, ale chcesz wiedzieć, kiedy otrzymujesz nieoczekiwane dane, usługa Databricks zaleca użycie rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Skala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Jeśli chcesz, aby strumień przestał przetwarzać, jeśli wprowadzono nowe pole, które nie jest zgodne ze schematem, możesz dodać:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Umożliwienie elastycznych potoków danych częściowo ustrukturyzowanych
Gdy otrzymujesz dane od dostawcy, który wprowadza nowe kolumny do przekazywanych informacji, możesz nie być świadomy dokładnie, kiedy to robią, lub możesz nie mieć możliwości zaktualizowania swojego potoku danych. Teraz możesz użyć ewolucji schematu, aby ponownie uruchomić strumień i zezwolić automatycznemu modułowi ładującego zaktualizować wywnioskowany schemat. Możesz również skorzystać z schemaHints
przy korzystaniu z niektórych nieschematycznych pól, które dostawca może udostępniać.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Skala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Przekształcanie zagnieżdżonych danych JSON
Ponieważ Auto Loader interpretuje kolumny JSON najwyższego poziomu jako ciągi znaków, mogą pozostać zagnieżdżone obiekty JSON, które trzeba dalej przekształcić. Można użyć interfejsów API dostępu do danych częściowo ustrukturyzowanych, aby dalej przekształcać złożoną zawartość JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Skala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Wnioskowanie zagnieżdżonych danych JSON
W przypadku zagnieżdżenia danych można użyć opcji cloudFiles.inferColumnTypes
, aby wywnioskować zagnieżdżoną strukturę danych oraz inne typy kolumn.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Skala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Ładowanie plików CSV bez nagłówków
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Skala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Wymuszanie schematu w plikach CSV z nagłówkami
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Skala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Wprowadzanie obrazów lub danych binarnych do Delta Lake na potrzeby uczenia maszynowego
Gdy dane są przechowywane w usłudze Delta Lake, możesz uruchomić rozproszone wnioskowanie na danych. Zobacz Wykonywanie wnioskowania rozproszonego przy użyciu funkcji zdefiniowanych przez użytkownika (UDF) w bibliotece pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Skala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Składnia modułu automatycznego ładownia dla deklaratywnych potoków Lakeflow
Lakeflow Declarative Pipelines zapewnia nieco zmodyfikowaną składnię języka Python dla Auto Loader i dodaje obsługę SQL dla Auto Loader.
W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders/",
format => "json")
Dla modułu ładującego automatycznego można użyć obsługiwanych opcji formatowania. Opcje dla read_files
są parami klucz-wartość. Aby uzyskać szczegółowe informacje na temat obsługiwanych formatów i opcji, zobacz opcje .
Na przykład:
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)
Poniższy przykład odczytuje dane z plików CSV rozdzielanych tabulatorami z nagłówkiem:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv",
delimiter => "\t",
header => "true"
)
Możesz użyć schema
, aby ręcznie określić format; Należy określić schema
dla formatów, które nie obsługują wnioskowania schematu :
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
format => "parquet",
schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)
Uwaga
Potoki deklaratywne Lakeflow automatycznie konfigurują i zarządzają katalogami schematu i punktów kontrolnych podczas korzystania z Auto Loader do odczytywania plików. Jeśli jednak ręcznie skonfigurujesz jeden z tych katalogów, wykonywanie pełnego odświeżania nie ma wpływu na zawartość skonfigurowanych katalogów. Usługa Databricks zaleca używanie automatycznie skonfigurowanych katalogów, aby uniknąć nieoczekiwanych skutków ubocznych podczas przetwarzania.