Распространенные шаблоны загрузки данных

Автозагрузчик упрощает ряд распространенных задач приема данных. В этом кратком справочнике приведены примеры нескольких популярных шаблонов.

Фильтрация каталогов или файлов с помощью стандартных масок

Стандартные маски можно использовать для фильтрации каталогов и файлов, если они указаны в пути.

Расписание Description
? Соответствует любому одиночному знаку
* Соответствует нескольким символам или их отсутствию
[abc] Соответствует одиночному символу из кодировки {a, b, c}.
[a-z] Соответствует одиночному символу из диапазона символов {a…z}.
[^a] Соответствует одиночному символу, который не относится к кодировке или диапазону символов {a}. Обратите внимание, что символ ^ должен стоять непосредственно справа от открывающей скобки.
{ab,cd} Соответствует строке из набора строк {ab, cd}.
{ab,c{de, fh}} Соответствует строке из набора строк {ab, cde, cfh}.

Используйте path для предоставления шаблонов префиксов, например:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Важно!

Для явного предоставления шаблонов суффиксов необходимо использовать параметр pathGlobFilter. path предоставляет только фильтр префиксов.

Например, если вы хотите проанализировать только файлы png в каталоге, содержащем файлы с разными суффиксами, можно выполнить указанные ниже команды.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Примечание.

Поведение автозагрузчика по умолчанию отличается от поведения по умолчанию других источников файлов Spark. Добавьте .option("cloudFiles.useStrictGlobber", "true") в чтение, чтобы использовать глоббинг, соответствующий по умолчанию поведению Spark в источниках файлов. Дополнительные сведения о глоббинге см. в следующей таблице:

Расписание Путь к файлу Globber по умолчанию Строгий глоббер
/a/b /a/b/c/file.txt Да Да
/a/b /a/b_dir/c/file.txt Нет Нет
/a/b /a/b.txt Нет Нет
/a/b/ /a/b.txt Нет Нет
/a/*/c/ /a/b/c/file.txt Да Да
/a/*/c/ /a/b/c/d/file.txt Да Да
/a/*/c/ /a/b/x/y/c/file.txt Да Нет
/a/*/c /a/b/c_file.txt Да Нет
/a/*/c/ /a/b/c_file.txt Да Нет
/a/*/c/ /a/*/cookie/file.txt Да Нет
/a/b* /a/b.txt Да Да
/a/b* /a/b/file.txt Да Да
/a/{0.txt,1.txt} /a/0.txt Да Да
/a/*/{0.txt,1.txt} /a/0.txt Нет Нет
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Да Да

Включение простого извлечения, преобразования и загрузки

Простой способ передать данные в Delta Lake без потери данных — использовать приведенный шаблон и включить вывод схемы с Автозагрузчиком. Databricks рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема выводится как строковые типы, любые ошибки синтаксического анализа (их не должно быть, если все данные останутся в виде строк) отправляются в _rescued_data, а любые новые столбцы приведут к сбою потока и развитию схемы.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Предотвращение потери данных в хорошо структурированных данных

Если вы знакомы со схемой, но хотите знать, когда будете получать непредвиденные данные, Databricks рекомендует использовать rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Если вы хотите, чтобы поток прекратил обработку при добавлении нового поля, которое не соответствует схеме, можно добавить:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Обеспечение гибких конвейеров с частично структурированными данными

При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда он это делают, или у вас может не быть достаточной пропускной способности для обновления конвейера данных. Теперь вы можете использовать развитие схемы для перезапуска потока и автоматического обновления выводимой схемы Автозагрузчиком. Можно также использовать schemaHints для некоторых полей "без схемы", которые может предоставить поставщик.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Преобразование вложенных данных JSON

Так как автозагрузчик выводит столбцы JSON верхнего уровня как строки, у вас могут остаться вложенные объекты JSON, требующие дальнейших преобразований. Вы можете использовать API доступа к частично структурированным данным для дальнейшего преобразования сложного содержимого JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Вывод вложенных данных JSON

При наличии вложенных данных можно использовать параметр cloudFiles.inferColumnTypes для вывода вложенной структуры данных и других типов столбцов.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Загрузка CSV-файлов без заголовков

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Применение схемы в CSV-файлах с заголовками

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Прием изображений или двоичных данных в Delta Lake для ML

После сохранения данных в Delta Lake можно выполнять распределенный вывод данных. Дополнительные сведения см. в разделе Выполнение распределенного вывода с помощью UDF pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Синтаксис автозагрузчика для DLT

Разностные динамические таблицы предоставляют немного измененный синтаксис Python для автозагрузчика, который добавляет поддержку SQL для автозагрузчика.

В следующих примерах для создания наборов данных из CSV- и JSON-файлов используется Автозагрузчик.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

С Автозагрузчиком можно использовать поддерживаемые параметры формата. map() С помощью функции можно передать параметры методуcloud_files(). Параметры — это пары "ключ-значение", где ключи и значения являются строками. Ниже описан синтаксис для работы с Автозагрузчиком в SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

В следующем примере показано, как из CSV-файлов с разделителями табуляции считываются данные с заголовком.

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Вы можете использовать schema для указания формата вручную; вы должны указать schema для форматов, которые не поддерживают вывод схемы.

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Примечание.

Разностные динамические таблицы автоматически настраивают каталоги схем и контрольных точек, а также управляют ими при использовании Автозагрузчика для чтения файлов. Однако если вы вручную настроите любой из этих каталогов, выполнение полного обновления не повлияет на содержимое настроенных каталогов. Для избежания непредвиденных побочных эффектов во время обработки, в Databricks рекомендуется использовать автоматически настроенные каталоги.