Поделиться через


Настройка развития и вывода схемы в автозагрузчике

Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных данных, позволяя инициализировать таблицы без явного объявления схемы данных и развивать схему таблицы в виде новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени.

Автозагрузчик также может "спасти" непредвиденные данные (например, отличающиеся типы данных) в столбец BLOB-объектов JSON, доступ к которым можно затем получить с помощью частично структурированных API доступа к данным.

Для вывода и развития схемы поддерживаются следующие форматы:

File format Поддерживаемые версии
JSON Все версии
CSV Все версии
XML Databricks Runtime 14.3 LTS и более поздних версий
Avro Databricks Runtime 10.4 LTS и более поздних версий
Parquet Databricks Runtime 11.3 LTS и более поздних версий
ORC Не поддерживается
Text Неприменимо (фиксированная схема)
Binaryfile Неприменимо (фиксированная схема)

Синтаксис для развития и вывода схемы

Указание целевого каталога для параметра cloudFiles.schemaLocation включает развитие и вывод схемы. Вы можете использовать тот же каталог, который указали для checkpointLocation. При использовании разностных динамических таблиц Azure Databricks автоматически управляет расположением схемы и другими сведениями о контрольной точке.

Примечание.

Если в целевую таблицу загружено несколько исходных данных, для каждой рабочей нагрузки приема автозагрузчика требуется отдельная контрольная точка потоковой передачи.

В следующем примере используется parquet для cloudFiles.format. Используйте csv, avro или json для других источников файлов. Все остальные параметры чтения и записи остаются неизменными для поведения по умолчанию для каждого формата.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Как работает вывод схемы автозагрузчика?

Чтобы определить схему при первом чтении данных, автозагрузчик примеров первых 50 ГБ или 1000 файлов, которые он обнаруживает, в зависимости от того, какой предел пересекается сначала. Автозагрузчик сохраняет сведения о схеме в каталоге _schemas , настроенном cloudFiles.schemaLocation для отслеживания изменений схемы входных данных с течением времени.

Примечание.

Чтобы изменить размер используемой выборки, можно задать конфигурации SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(байтовая строка, например 10gb)

и

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(целое число)

По умолчанию вывод схемы автозагрузчика стремится избежать проблем с эволюцией схемы из-за несоответствия типов. Для форматов, которые не кодируют типы данных (JSON, CSV и XML), автозагрузчик выводит все столбцы в виде строк (включая вложенные поля в JSON-файлах). Для форматов с типизированной схемой (Parquet и Avro) автозагрузчик примеров подмножества файлов и объединяет схемы отдельных файлов. Это поведение приводится в следующей таблице:

File format Тип данных с выводом по умолчанию
JSON Строка
CSV Строка
XML Строка
Avro Типы, закодированные в схеме Avro
Parquet Типы, закодированные в схеме Parquet

Apache Spark DataFrameReader использует другое поведение для вывода схемы, выбор типов данных для столбцов в json, CSV и XML-источниках на основе примеров данных. Чтобы включить это поведение с помощью автозагрузчика, задайте для параметра значение cloudFiles.inferColumnTypes true.

Примечание.

При выводе схемы для данных CSV автозагрузчик предполагает, что файлы содержат заголовки. Если CSV-файлы не содержат заголовков, укажите параметр .option("header", "false"). Кроме того, Auto Loader объединяет схемы всех файлов в примере, чтобы получить глобальную схему. После этого Auto Loader может считывать каждый файл в соответствии с заголовком и правильно анализировать данные в формате CSV.

Примечание.

Если столбец имеет разные типы данных в двух файлах Parquet, автозагрузчик выбирает самый широкий тип. Для переопределения этого варианта можно использовать schemaHints . При указании подсказок схемы автозагрузчик не приводит столбец к указанному типу, а указывает средству чтения Parquet читать столбец как указанный тип. В случае несоответствия столбец спасается в спасаемом столбце данных.

Как работает эволюция схемы автозагрузчика?

Автозагрузчик обнаруживает добавление новых столбцов при обработке данных. Когда автозагрузчик обнаруживает новый столбец, поток останавливается с UnknownFieldExceptionпомощью . Прежде чем поток выдает эту ошибку, автозагрузчик выполняет вывод схемы в последней микропакете данных и обновляет расположение схемы с последней схемой, объединяя новые столбцы в конец схемы. Типы данных существующих столбцов останутся неизменными.

Databricks рекомендует настроить потоки автозагрузчика с помощью заданий Databricks, чтобы автоматически перезапустить после таких изменений схемы.

Автозагрузчик поддерживает следующие режимы для развития схем, которые задаются в параметре cloudFiles.schemaEvolutionMode:

Режим Поведение при чтении нового столбца
addNewColumns (по умолчанию) Поток завершается ошибкой. Новые столбцы будут добавлены в схему. Существующие столбцы не развивают типы данных.
rescue Схема никогда не развивается, и поток не завершается ошибкой из-за изменений схемы. Все новые столбцы записываются в спасаемом столбце данных.
failOnNewColumns Поток завершается ошибкой. Поток не перезапускается, если указанная схема не обновляется или файл данных обижается.
none Не развивается схема, новые столбцы игнорируются, и данные не спасаются, если rescuedDataColumn параметр не задан. Поток не завершается ошибкой из-за изменений схемы.

Как секции работают с автозагрузчиком?

Автозагрузчик пытается определить столбцы секций из базовой структуры каталогов данных, если данные размещаются в секционирования стилей Hive. Например, путь к base_path/event=click/date=2021-04-01/f0.json файлу приводит к выводу date и event как столбцам секционирования. Если базовая структура каталога содержит конфликтующие секции Hive или не содержит секционирование стилей Hive, столбцы секционирования игнорируются.

Двоичные файлы (binaryFile) и text форматы файлов имеют фиксированные схемы данных, но поддерживают вывод столбцов секций. Databricks рекомендует задать cloudFiles.schemaLocation для этих форматов файлов. Это позволяет избежать возможных ошибок или потери информации и предотвращает вывод столбцов секций при каждом запуске автозагрузчика.

Столбцы секционирования не учитываются при развитии схемы. Если у вас была начальная структура каталога, например base_path/event=click/date=2021-04-01/f0.json, а затем начать получать новые файлы в качестве base_path/event=click/date=2021-04-01/hour=01/f1.json, автозагрузчик игнорирует столбец часа. Для записи сведений о новых столбцах секционирования задайте для cloudFiles.partitionColumns значение event,date,hour.

Примечание.

cloudFiles.partitionColumns Параметр принимает разделенный запятыми список имен столбцов. Анализируются только столбцы, которые существуют как key=value пары в структуре каталогов.

Что такое столбец спасенных данных?

При автоматической загрузке схемы автоматически добавляется в схему в качестве _rescued_dataспасенного столбца данных. Можно переименовать столбец или включить его в тех случаях, когда вы предоставляете схему параметром rescuedDataColumn.

Спасаемый столбец данных гарантирует, что столбцы, которые не соответствуют схеме, будут спасены вместо удаления. Столбец спасенных данных содержит любые данные, которые не анализируются по следующим причинам:

  • Столбец отсутствует в схеме.
  • Несоответствия типов.
  • Несоответствие регистра.

Столбец спасенных данных содержит JSON, содержащий спасаемые столбцы и путь к исходному файлу записи.

Примечание.

Синтаксические анализаторы JSON и CSV поддерживают три режима при анализе записей: PERMISSIVE, DROPMALFORMED и FAILFAST. При использовании вместе с rescuedDataColumn несоответствие типов данных не приводит к удалению записей в режиме DROPMALFORMED или возникновению ошибки в режиме FAILFAST. Будут удалены или приведут к ошибке только поврежденные записи, т. е. неполные или неправильные файлы JSON или CSV. Если вы используете badRecordsPath при синтаксическом анализе JSON или CSV, несоответствия типов данных не будут считаться неправильными записями при использовании rescuedDataColumn. В badRecordsPath хранятся только неполные и неправильные записи JSON или CSV.

Изменение поведения с учетом регистра

Если учет регистра не включен, столбцы abc, Abc и ABC считаются одним столбцом в целях вывода схемы. Выбор регистра является произвольным и зависит от данных выборки. Вы можете использовать указания схемы, чтобы определить, какой регистр следует использовать. После выбора и вывода схемы автозагрузчик не будет учитывать варианты регистра, которые не были выбраны с учетом схемы.

Если столбец восстановленных данных включен, поля, именованные в регистре, отличном от указанного в схеме, будут загружены в столбец _rescued_data. Измените это поведение, установив для параметра readerCaseSensitive значение false. В этом случае автозагрузчик будет считывать данные без учета регистра.

Переопределение вывода схемы с указанием схемы

Вы можете использовать подсказки схемы для принудительного применения сведений о схеме, которые вы знаете и ожидаете в выводимых схемах. Если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, double вместо него integer), можно указать произвольное количество подсказок для типов данных столбцов в виде строки с помощью синтаксиса спецификации схемы SQL, например следующего:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Список поддерживаемых типов данных см. в документации по типам данных.

Если столбец отсутствует в начале потока, можно также использовать указания схемы, чтобы добавить этот столбец в выведенную схему.

Ниже приведен пример выведенной схемы, чтобы прояснить роль указаний схемы.

Выведенная схема:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Задав следующие указания схемы:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

вы получите:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Примечание.

Поддержка указаний для схем Array и Map доступна в Databricks Runtime 9.1 LTS и более поздних версий.

Ниже приведен пример выведенной схемы со сложными типами, чтобы прояснить роль указаний схемы.

Выведенная схема:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Задав следующие указания схемы:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

вы получите:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Примечание.

Указания схемы используются только в том случае, если вы не предоставляете схема Автозагрузчику. Указания схемы можно использовать независимо от того, включен ли параметр cloudFiles.inferColumnTypes или отключен.