Распространенные шаблоны загрузки данных

Автозагрузчик упрощает ряд распространенных задач приема данных. В этом кратком справочнике приведены примеры нескольких популярных шаблонов.

Фильтрация каталогов или файлов с помощью стандартных масок

Стандартные маски можно использовать для фильтрации каталогов и файлов, если они указаны в пути.

Шаблон Описание
? Соответствует любому одиночному знаку
* Соответствует нескольким символам или их отсутствию
[abc] Соответствует одиночному символу из кодировки {a, b, c}.
[a-z] Соответствует одиночному символу из диапазона символов {a…z}.
[^a] Соответствует одиночному символу, который не относится к кодировке или диапазону символов {a}. Обратите внимание, что символ ^ должен стоять непосредственно справа от открывающей скобки.
{ab,cd} Соответствует строке из набора строк {ab, cd}.
{ab,c{de, fh}} Соответствует строке из набора строк {ab, cde, cfh}.

Используйте path для предоставления шаблонов префиксов, например:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

Важно!

Для явного предоставления шаблонов суффиксов необходимо использовать параметр pathGlobFilter. path предоставляет только фильтр префиксов.

Например, если вы хотите проанализировать только файлы png в каталоге, содержащем файлы с разными суффиксами, можно выполнить указанные ниже команды.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

Включение простого извлечения, преобразования и загрузки

Простой способ передать данные в Delta Lake без потери данных — использовать приведенный шаблон и включить вывод схемы с Автозагрузчиком. Databricks рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема выводится как строковые типы, любые ошибки синтаксического анализа (их не должно быть, если все данные останутся в виде строк) отправляются в _rescued_data, а любые новые столбцы приведут к сбою потока и развитию схемы.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Предотвращение потери данных в хорошо структурированных данных

Если вы знакомы со схемой, но хотите знать, когда будете получать непредвиденные данные, Databricks рекомендует использовать rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Если вы хотите, чтобы поток прекратил обработку при добавлении нового поля, которое не соответствует схеме, можно добавить:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Обеспечение гибких конвейеров с частично структурированными данными

При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда он это делают, или у вас может не быть достаточной пропускной способности для обновления конвейера данных. Теперь вы можете использовать развитие схемы для перезапуска потока и автоматического обновления выводимой схемы Автозагрузчиком. Можно также использовать schemaHints для некоторых полей "без схемы", которые может предоставить поставщик.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Преобразование вложенных данных JSON

Так как автозагрузчик выводит столбцы JSON верхнего уровня как строки, у вас могут остаться вложенные объекты JSON, требующие дальнейших преобразований. Вы можете использовать API доступа к частично структурированным данным для дальнейшего преобразования сложного содержимого JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<source_data_with_nested_json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<source_data_with_nested_json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Вывод вложенных данных JSON

При наличии вложенных данных можно использовать параметр cloudFiles.inferColumnTypes для вывода вложенной структуры данных и других типов столбцов.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source_data_with_nested_json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source_data_with_nested_json>")

Загрузка CSV-файлов без заголовков

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Применение схемы в CSV-файлах с заголовками

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Прием изображений или двоичных данных в Delta Lake для ML

После сохранения данных в Delta Lake можно выполнять распределенный вывод данных. Дополнительные сведения см. в разделе Выполнение распределенного вывода с помощью UDF pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")