Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Эволюция схемы относится к способности системы адаптироваться к изменениям в структуре данных с течением времени. Эти изменения часто возникают при работе с полуструктурированных данных, потоками событий или сторонними источниками, в которых добавляются новые поля, смена типов данных или вложенные структуры.
Распространенные изменения включают:
- Новые столбцы: дополнительные поля, не определенные ранее, иногда с пользовательским значением обратной заполнения.
-
Переименование столбцов: изменение имени столбца, например из
namefull_name. - Исключенные столбцы: удаление столбцов из схемы таблицы.
-
Расширение типа: изменение типа столбца на более широкий. Например,
INTполе становитсяDOUBLE. -
Другие изменения типа: изменение типа столбца. Например,
INTполе становитсяSTRING.
Поддержка эволюции схемы имеет решающее значение для создания устойчивых, длительных конвейеров, которые могут включать изменение данных без частых обновлений вручную.
Components
Эволюция схемы Azure Databricks включает четыре основные категории компонентов, каждая обработка схемы изменяется независимо:
- Соединители: компоненты, которые принимают данные из внешних источников. К ним относятся соединители Auto Loader, Kafka, Kinesis и Lakeflow.
-
Средства синтаксического анализа формата: функции, декодирующие необработанные форматы, включая
from_json,from_avrofrom_xmlиfrom_protobuf. - Подсистемы: обработка обработчиков, выполняющих запросы, включая структурированную потоковую передачу.
- Наборы данных: потоковые таблицы, материализованные представления, разностные таблицы и представления, сохраняющие и обслуживающие данные.
Каждый компонент в эволюции схемы проектирования данных является независимым. Вы несете ответственность за настройку эволюции схемы в отдельных компонентах для достижения требуемого поведения в потоке обработки данных.
Например, при использовании Auto Loader для приема данных в таблицу Delta существуют две сохраненные схемы: одна управляется Auto Loader в расположении схемы, а другая — схема целевой таблицы Delta. В стабильном состоянии эти два одинаковы. Когда схема автозагрузчика изменяется на основе входящих данных, схема таблицы Delta также должна меняться, иначе запрос завершается ошибкой. В этом случае можно (a) обновить целевую схему таблицы Delta, включив эволюцию схемы или используя прямую команду DDL, или (b) выполните полную перезапись целевой таблицы Delta.
Поддержка эволюции схемы с помощью коннектора
В следующих разделах описано, как каждый компонент Azure Databricks обрабатывает различные типы изменений схемы.
Автозагрузчик
Автозагрузчик поддерживает изменения столбцов, но не изменения типов. Настройте автоматическую эволюцию схемы с помощью cloudFiles.schemaEvolutionMode и rescuedDataColumn. Вы можете вручную задать schemaHints или неизменяемый schema. При автоматическом изменении схемы поток сначала выдает ошибку. При перезапуске используется обновлённая схема. Узнайте , как работает эволюция схемы автозагрузчика?.
-
Новые столбцы: поддерживаемые, в зависимости от выбранного
schemaEvolutionModeпараметра. Сбой при перезапуске вручную, необходимый для добавления новых столбцов в схему. -
Переименование столбцов: поддерживается в зависимости от выбранного
schemaEvolutionModeпараметра. Переименованный столбец обрабатывается как добавленный новый столбец, а старый столбец заполняетсяNULLновыми строками. Сбой при перезапуске вручную, необходимого для обновления схемы. -
Исключенные столбцы: поддерживается. Обрабатывается как мягкое удаление, где заданы
NULLновые строки для удаленного столбца. -
Расширение типа: не поддерживается. Изменения типов фиксируются в
rescuedDataColumn, еслиrescueDataColumnустановлено иschemaEvolutionModeустановлено вrescue. В противном случае требуется изменение схемы вручную. -
Другие изменения типа: не поддерживается. Изменения типов фиксируются в
rescuedDataColumn, еслиrescueDataColumnустановлено иschemaEvolutionModeустановлено вrescue. В противном случае требуется изменение схемы вручную.
Соединитель Delta
Соединитель Delta может поддерживать эволюцию схемы. Если чтение из таблицы Delta с включенным сопоставлением столбцов и schemaTrackingLocation включено, она поддерживает эволюцию схемы для переименования столбцов и удаления столбцов. Необходимо задать правильную конфигурацию Spark для каждого из этих соответствующих изменений, чтобы развивать схему без остановки потока. В противном случае поток обновляет свою отслеживаемую схему при обнаружении изменений и затем останавливается. Затем необходимо вручную перезапустить потоковый запрос, чтобы возобновить обработку.
-
Новые столбцы: поддерживаемые. При
mergeSchemaвключении новые столбцы добавляются автоматически. В противном случае запрос завершается сбоем и необходимо перезапустить поток, чтобы добавить новые столбцы в схему, но для таблицы Delta не требуется перезапись. -
Переименование столбцов: поддерживается. При
mergeSchemaвключении переименование обрабатывается автоматически. В противном случае можно развивать схему в потоковом запросе с помощью конфигурации Sparkspark.databricks.delta.streaming.allowSourceColumnRename. -
Исключенные столбцы: поддерживается. При включении
mergeSchemaисключенные столбцы обрабатываются автоматически. В противном случае можно изменить схему в потоковом запросе с помощью конфигурации Sparkspark.databricks.delta.streaming.allowSourceColumnDrop. -
Расширение типа: поддерживается в Databricks Runtime 16.4 LTS и более поздних версиях. При включённом
mergeSchemaи расширении типов в целевой таблице изменения типов обрабатываются автоматически. Можно включить расширение типов с помощью свойства таблицыtype widening. - Другие изменения типа: не поддерживается.
Соединители SaaS и CDC
Соединители SaaS и CDC автоматически изменяют схему при изменении столбцов. Это обрабатывается с помощью автоматического перезапуска при обнаружении изменения. Для изменений типов требуется полное обновление.
- Новые столбцы: поддерживаемые. Запрос автоматически перезапускается для устранения несоответствия схемы.
- Переименование столбцов: поддерживается. Запрос автоматически перезапускается для устранения несоответствия схемы. Переименованный столбец рассматривается как добавленный новый столбец.
-
Исключенные столбцы: поддерживается. Удаленные столбцы обрабатываются как «мягкие» удаления, где для новых строк удаленного столбца устанавливается
NULL. - Расширение типа: не поддерживается. Для обновления схемы требуется полное пересоздание.
- Другие изменения типа: не поддерживается. Для обновления схемы требуется полное пересоздание.
Соединители Kinesis, Kafka, Pub/Sub и Pulsar
Не поддерживается эволюция собственной схемы. Каждая функция соединителя возвращает двоичный объект. Развитие схемы обрабатывается форматным парсером.
- Новые столбцы: обрабатываются парсером формата.
- Переименование столбцов: выполняется парсером формата.
- Удаленные столбцы: обрабатываются средством синтаксического анализа формата.
- Расширение типа: обрабатывается средство синтаксического анализа формата.
- Другие изменения типа: обрабатывается синтаксическим анализатором формата.
Поддержка эволюции схемы с помощью парсера форматов
from_json синтаксический анализатор
Парсер from_json не поддерживает эволюцию схемы. Необходимо вручную обновить схему. При использовании from_json в декларативных конвейерах Spark Lakeflow автоматическое изменение схемы можно включить с помощью schemaLocationKey и schemaEvolutionMode.
- Новые столбцы: если включена автоматическая эволюция схемы, она ведет себя как автозагрузчик.
- Переименование столбцов: если включена автоматическая эволюция схемы, она ведет себя как автозагрузчик.
- Удаленные столбцы: если включена автоматическая эволюция схемы, она ведет себя как автозагрузчик.
- Расширение типа: если включена автоматическая эволюция схемы, она ведет себя как автозагрузчик.
- Другие изменения типа: если включена автоматическая эволюция схемы, она ведет себя как автозагрузчик.
from_avro и from_protobuf парсеры
from_avro и from_protobuf синтаксические анализаторы ведут себя одинаково. Схема может быть получена из реестра схем Confluent, или пользователь может предоставить схему и обновить схему вручную. Отсутствует концепция эволюции схемы в пределах from_avro или from_protobuf функции. Она должна обрабатываться подсистемой выполнения и реестром схем.
- Новые столбцы: поддерживается в реестре схем Confluent. В противном случае пользователь должен вручную обновить схему.
- Переименование столбцов: поддерживается в реестре схем Confluent. В противном случае пользователь должен вручную обновить схему.
- Удаленные столбцы: поддерживается в реестре схем Confluent. В противном случае пользователь должен вручную обновить схему.
- Расширение типа: поддерживается в реестре схем Confluent. В противном случае пользователь должен вручную обновить схему.
- Другие изменения типов: поддерживается с Реестром схем Confluent. В противном случае пользователь должен вручную обновить схему.
from_csv и from_xml парсеры
from_csv и from_xml парсеры не поддерживают эволюцию схемы.
- Новые столбцы: не поддерживается
- Переименование столбцов: не поддерживается
- Удаленные столбцы: не поддерживается
- Расширение типа: не поддерживается
- Другие изменения типа: не поддерживается
Поддержка эволюции схемы движком системы
Структурированная потоковая передача
Схема потокового запроса фиксируется во время этапа планирования, и все микро-пакеты используют этот план без необходимости перепланирования. Если исходная схема изменяется в середине выполнения, запрос завершается сбоем, и пользователь должен перезапустить потоковый запрос, чтобы Spark смог перепланировать новую схему.
Набор данных, записываемый потоком, также должен поддерживать эволюцию схемы.
- Новые столбцы: поддерживаемые. Запрос завершается ошибкой, и необходимо перезапустить поток, чтобы устранить несоответствие схемы.
- Переименование столбцов: поддерживается. Запрос завершается ошибкой, и необходимо перезапустить поток, чтобы устранить несоответствие схемы.
- Исключенные столбцы: поддерживается. Запрос завершается ошибкой, и необходимо перезапустить поток, чтобы устранить несоответствие схемы.
- Расширение типов: поддерживается. Запрос завершается ошибкой, и необходимо перезапустить поток, чтобы устранить несоответствие схемы.
- Другие изменения типа: поддерживается. Запрос завершается ошибкой, и необходимо перезапустить поток, чтобы устранить несоответствие схемы.
Эволюция схемы по набору данных
Таблицы потоков
Потоковые таблицы по умолчанию поддерживают поведение эволюции схемы слияния. Для обновления схемы не требуется перезапуск вручную, но для изменения произвольной схемы требуется полное обновление.
- Новые столбцы: поддерживаемые. Запрос автоматически перезапускается, чтобы устранить несоответствие схемы.
- Переименование столбцов: поддерживается. Запрос перезапускается, чтобы устранить несоответствие схемы. Переименованный столбец рассматривается как добавленный новый столбец.
- Исключенные столбцы: поддерживается. Удаленные столбцы рассматриваются как мягкое удаление, где новые строки для удаленного столбца устанавливаются как NULL.
- Расширение типов: поддерживается. Расширение типов должно быть включено либо на уровне конвейера, либо непосредственно в таблице данных. См. расширение типов в декларативных потоках Lakeflow Spark.
- Другие изменения типа: не поддерживается. Для обновления схемы требуется полное пересоздание.
Материализованные представления
Любое обновление схемы или определяющего запроса активирует полный перекомпьютирование материализованного представления.
- Новые столбцы: выполняется полный пересчёт.
- Переименование столбца: вызван полный пересчет.
- Удаленные столбцы: выполняется полный пересчет.
- Расширение типа: выполняется полный пересчет.
- Другие изменения типа: инициирован полный пересчёт.
таблицы Delta
Разностные таблицы поддерживают различные конфигурации для обновления схемы таблицы, включая переименование, удаление и расширение типа столбцов без перезаписи данных таблицы. Поддерживаемые конфигурации включают эволюцию схемы слияния, сопоставление столбцов, расширение типов и перезапись схемы.
- Новые столбцы: поддерживаемые. Автоматическое развитие происходит при включении эволюции схемы слияния, без необходимости переписывания таблицы Delta. Если эволюция схемы соединения не включена, обновления не удаются.
-
Переименование столбцов: поддерживается. Можно переименовывать через ручные команды
ALTER TABLE DDL, когда включено сопоставление столбцов. Не требуется перезапись таблицы Delta. -
Исключенные столбцы: поддерживается. Можно вручную удалять столбцы с помощью команд
ALTER TABLE DDL, когда включено сопоставление столбцов. Не требуется перезапись таблицы Delta. -
Расширение типов: поддерживается. Автоматически применяет изменение типа при включении расширения типа и изменения схемы слияния. При включенном расширении типа вы можете вручную расширять столбцы с помощью команд, используя
ALTER TABLE DDL. Без настройки операции завершаются сбоем. См. типы Widen с автоматической эволюцией схемы. -
Другие изменения типа: поддерживается, но требуется полная перезапись разностной таблицы. Необходимо включить
overwriteSchema, что обеспечивает полную перезапись разностной таблицы. В противном случае операции завершаются ошибкой.
Views
Если представление имеет column_list, который не соответствует новой схеме, или имеет запрос, который не может быть разобран, представление становится недействительным. Если это не срабатывает, вы можете включить эволюцию схемы для изменений типов с помощью SCHEMA TYPE EVOLUTION, а также для новых, переименованных и удалённых столбцов с помощью SCHEMA EVOLUTION (что является расширенной версией эволюции типов).
-
Новые столбцы: поддерживаемые. В
SCHEMA EVOLUTIONрежиме представление автоматически развивается без каких-либо действий вручную, если нет явногоcolumn_list. В противном случае представление может стать недействительным, и пользователь не может запросить его. -
Переименование столбцов: поддерживается. В
SCHEMA EVOLUTIONрежиме представление автоматически развивается без каких-либо действий вручную, если нет явногоcolumn_list. В противном случае представление может стать недействительным. -
Исключенные столбцы: поддерживается. В
SCHEMA EVOLUTIONрежиме представление автоматически развивается без каких-либо действий вручную, если нет явногоcolumn_list. В противном случае представление может стать недействительным. -
Расширение типов: поддерживается. В режиме
SCHEMA TYPE EVOLUTIONпредставление автоматически адаптируется к любым изменениям в типах. ВSCHEMA EVOLUTIONрежиме представление автоматически развивается без каких-либо действий вручную, если нет явногоcolumn_list. В противном случае представление может стать недействительным. -
Другие изменения типа: поддерживается. В режиме
SCHEMA TYPE EVOLUTIONпредставление автоматически адаптируется к любым изменениям в типах. ВSCHEMA EVOLUTIONрежиме представление автоматически развивается без каких-либо действий вручную, если нет явногоcolumn_list. В противном случае представление может стать недействительным.
Example
В следующем примере показано, как принять топик Kafka с полезными данными, кодированными в Avro и зарегистрированными в Реестре схем Confluent, и записать их в управляемую таблицу Delta с включенной эволюцией схемы.
Ключевые моменты, иллюстрированные:
- Интегрируйтесь с коннектором Kafka.
- Декодирование записей Avro с помощью from_avro с реестром схем Kafka.
- Обработка эволюции схемы путем задания
avroSchemaEvolutionMode. - Записать в таблицу Delta с включенной функцией
mergeSchema, чтобы разрешать аддитивные изменения.
В коде предполагается, что у вас есть раздел Kafka с помощью реестра схем Confluent, выводя данные в кодировке Avro.
# ----- CONFIG: fill these in -----
# Catalog and schema:
CATALOG = "<catalog_name>"
SCHEMA = "<schema_name>"
# Schema Registry:
# (This is where the producer evolves the schema)
SCHEMA_REG = "<schema registry endpoint>"
SR_USER = "<api key>"
SR_PASS = "<api secret>"
# Confluent Cloud: SASL_SSL broker:
BOOTSTRAP = "<server:ip>"
# Kafka topic:
TOPIC = "<topic>"
# ----- end: config -----
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_users"
CHECKPOINT = f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/bronze_users"
# Kafka auth (example for Confluent Cloud SASL/PLAIN over SSL)
KAFKA_OPTS = {
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config": f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{SR_USER}' password='{SR_PASS}';"
}
# ----- Evolution knobs -----
# spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = True)
from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro
# Build reader
reader = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
)
# Attach Kafka auth options
for k, v in KAFKA_OPTS.items():
reader = reader.option(k, v)
# --- No native schema evolution supported. Returns a binary blob. ---
raw_df = reader.load()
# Decode Avro with Schema Registry
# --- The format parser handles updating the schema using the schema registry ---
decoded = from_avro(
data=col("value"),
jsonFormatSchema=None, # using SR
subject=f"{TOPIC}-value",
schemaRegistryAddress=SCHEMA_REG,
options={
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{SR_USER}:{SR_PASS}",
# Behavior on schema changes:
"avroSchemaEvolutionMode": "restart", # fail-fast so you can restart and adopt new fields
"mode": "FAILFAST"
}
).alias("payload")
bronze_df = raw_df.select(decoded, "timestamp").select("payload.*", "timestamp")
# Write to a managed Delta table as a STREAM
# --- Need to enable schema evolution separately for streaming to a Delta separately with mergeSchema --
(bronze_df.writeStream
.format("delta")
.option("checkpointLocation", CHECKPOINT)
.option("ignoreChanges", "true")
.outputMode("append")
.option("mergeSchema", "true") # only supports adding new columns. Renaming, dropping, and type changes need to be handled separately.
.trigger(availableNow=True) # Use availableNow trigger for Databricks SQL/Unity Catalog
.toTable(BRONZE_TABLE)
)