Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
A evolução do esquema refere-se à capacidade de um sistema de se adaptar às alterações na estrutura de dados ao longo do tempo. Essas alterações são comuns ao trabalhar com dados semiestruturados, fluxos de eventos ou fontes de terceiros em que novos campos são adicionados, os tipos de dados mudam ou as estruturas aninhadas evoluem.
As alterações comuns incluem:
- Novas colunas: campos adicionais não definidos anteriormente, às vezes com um valor de backfill personalizado.
-
Renomeação de coluna: alterando um nome de coluna, por exemplo, de
nameparafull_name. - Colunas descartadas: removendo colunas do esquema de tabela.
-
Ampliação de tipo: alterando o tipo de uma coluna para uma mais ampla. Por exemplo, um
INTcampo se tornandoDOUBLE. -
Outras alterações de tipo: alterando o tipo de uma coluna. Por exemplo, um
INTcampo se tornandoSTRING.
O suporte à evolução do esquema é fundamental para a criação de pipelines resilientes e de execução longa que podem acomodar a alteração de dados sem atualizações manuais frequentes.
Components
A evolução do esquema do Azure Databricks envolve quatro categorias de componentes principais, cada uma tratando alterações de esquema de forma independente:
- Conectores: componentes que ingerem dados de fontes externas. Isso inclui conectores do Auto Loader, Kafka, Kinesis e Lakeflow.
-
Analisadores de formato: funções que decodificam formatos brutos, incluindo
from_json,from_avro,from_xmlefrom_protobuf. - Mecanismos: mecanismos de processamento que executam consultas, incluindo Streaming Estruturado.
- Conjuntos de dados: tabelas de streaming, exibições materializadas, tabelas Delta e exibições que persistem e servem dados.
Cada componente na evolução do esquema da arquitetura de engenharia de dados é independente. Você é responsável por configurar a evolução do esquema em componentes individuais para alcançar o comportamento desejado no fluxo de processamento de dados.
Por exemplo, ao usar o Carregador Automático para ingerir dados em uma tabela Delta, há dois esquemas persistentes: um é gerenciado pelo Carregador Automático em seu local de esquema e o outro é o esquema da tabela Delta de destino. Em um estado estável, esses dois são iguais. Quando o Carregador Automático evolui seu esquema, com base nos dados de entrada, a tabela Delta também deve evoluir seu esquema ou a consulta falha. Nesse caso, você pode (a) atualizar o esquema de tabela Delta de destino habilitando a evolução do esquema ou usando um comando DDL direto ou (b) fazer uma reescrita completa da tabela Delta de destino.
Suporte à evolução do esquema por conector
As seções a seguir detalham como cada componente do Azure Databricks lida com diferentes tipos de alterações de esquema.
Carregador Automático
O Carregador Automático dá suporte a alterações de coluna e ampliação de tipo. Configure a evolução automática do esquema com cloudFiles.schemaEvolutionMode e rescuedDataColumn. Você pode definir schemaHints manualmente ou um imutável schema. Ao evoluir o esquema automaticamente, o fluxo falha inicialmente. Na reinicialização, o esquema evoluído é usado. Veja como funciona a evolução do esquema do Carregador Automático?.
-
Novas colunas: Suportado, dependendo da opção
schemaEvolutionModeescolhida. É necessária uma reinicialização manual devido a uma falha para adicionar novas colunas ao esquema. -
Renomeação de coluna: Suportada, dependendo do
schemaEvolutionModeselecionado. A coluna renomeada é tratada como uma nova coluna adicionada, e a coluna antiga é preenchida comNULLpara as novas linhas. Falha com uma reinicialização manual necessária para atualizar o esquema. -
Colunas removidas: suportado. Tratadas como exclusões suaves, em que novas linhas para a coluna excluída são definidas como
NULL. -
Ampliação de tipo: Suportado no Databricks Runtime 16.4 e superiores com
schemaEvolutionModedefinido comoaddNewColumnsWithTypeWidening. As alterações de tipo de dados com suporte são ampliadas automaticamente. Mudanças de tipo não suportadas são capturadas norescuedDataColumn. Veja a ampliação automática de tipo com o Carregador Automático. -
Outras alterações de tipo: sem suporte. As alterações de tipo são capturadas no
rescuedDataColumnserescueDataColumnfoi definido eschemaEvolutionModedefinido comorescue. Caso contrário, requer uma alteração de esquema manual.
Conector Delta
O conector Delta pode dar suporte à evolução do esquema. Se estiver lendo de uma tabela Delta com mapeamento de coluna e schemaTrackingLocation habilitado, há suporte à evolução do esquema de dados para renomeação e descarte de colunas. Você deve definir a configuração do Spark correta para cada uma dessas respectivas alterações para evoluir o esquema sem parar o fluxo. Caso contrário, o fluxo atualiza seu esquema rastreado sempre que uma alteração é detectada e, em seguida, para. Em seguida, você deve reiniciar manualmente a consulta de streaming para retomar o processamento.
-
Novas colunas: Suportado. Com
mergeSchemahabilitado, novas colunas são adicionadas automaticamente. Caso contrário, a consulta falhará e você deverá reiniciar o fluxo para adicionar as novas colunas ao esquema, mas a tabela Delta não exigirá uma reescrita. -
Renomeação de coluna: Suportada. Com
mergeSchemahabilitado, a renomeação é tratada automaticamente. Caso contrário, você poderá evoluir o esquema em uma consulta de streaming com a configuraçãospark.databricks.delta.streaming.allowSourceColumnRenamedo Spark. -
Colunas removidas: suportado. Com
mergeSchemahabilitado, as colunas descartadas são tratadas automaticamente. Caso contrário, você poderá evoluir o esquema em uma consulta de streaming com a configuraçãospark.databricks.delta.streaming.allowSourceColumnDropdo Spark. -
Ampliação de tipo: suportado no Databricks Runtime 16.4 LTS e versões superiores. Com
mergeSchemahabilitado e o ajuste de tipo ativado na tabela de destino, as alterações de tipo são tratadas automaticamente. Você pode habilitar a ampliação de tipo com a propriedade datype wideningtabela. - Outras alterações de tipo: sem suporte.
Conectores SaaS e CDC
Os conectores SaaS e CDC evoluem o esquema automaticamente quando as colunas são alteradas. Isso é tratado por meio de uma reinicialização automática quando uma alteração é detectada. As alterações de tipo exigem uma atualização completa.
- Novas colunas: Suportado. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema. A coluna renomeada é tratada como uma nova coluna adicionada.
-
Colunas removidas: suportado. As colunas descartadas são tratadas como exclusões lógicas, em que novas linhas para a coluna excluída são definidas como
NULL. - Ampliação de tipo: sem suporte. Atualizar o esquema requer uma atualização completa.
- Outras alterações de tipo: sem suporte. Atualizar o esquema requer uma atualização completa.
Conectores Kinesis, Kafka, Pub/Sub e Pulsar
Não há suporte para evolução de esquema nativo. Cada uma das funções do conector retorna um blob binário. A evolução do esquema é tratada pelo analisador de formato.
- Novas colunas: manipuladas pelo analisador de formato.
- Renomeação de coluna: tratado pelo analisador de formato.
- Colunas descartadas: manipuladas pelo analisador de formato.
- Ampliação de tipo: manipulado pelo analisador de formato.
- Outras alterações de tipo: manipuladas pelo analisador de formato.
Suporte à evolução do esquema por analisador de formato
from_json analisador sintático
O from_json analisador não dá suporte à evolução do esquema. Você deve atualizar o esquema manualmente. Ao usar from_json no Lakeflow Spark Declarative Pipelines, a evolução automática do esquema pode ser habilitada com schemaLocationKey e schemaEvolutionMode.
- Novas colunas: quando a evolução automática do esquema está habilitada, ela se comporta como Carregador Automático.
- Renomeação de colunas: quando a evolução automática do esquema está habilitada, ela se comporta como Carregador Automático.
- Colunas descartadas: quando a evolução automática do esquema está habilitada, ela se comporta como o Auto Loader.
- Ampliação de tipo: quando a evolução automática do esquema está habilitada, ela se comporta como Auto Loader.
- Outras alterações de tipo: quando a evolução automática do esquema está habilitada, ela se comporta como Carregador Automático.
from_avro e from_protobuf analisadores
Os analisadores from_avro e from_protobuf se comportam da mesma maneira. O esquema pode ser buscado no Registro de Esquema do Confluent ou o usuário pode fornecer um esquema e deve atualizar o esquema manualmente. Não há nenhum conceito de evolução do esquema dentro da from_avro função ou from_protobuf função; ela deve ser tratada pelo mecanismo de execução e pelo Registro de Esquema.
- Novas colunas: suportadas pelo Registro de Esquema do Confluent. Caso contrário, o usuário deverá atualizar o esquema manualmente.
- Renomeação de coluna: é compatível com o Confluent Schema Registry. Caso contrário, o usuário deverá atualizar o esquema manualmente.
- Colunas descartadas: compatível com o Registro de Esquema do Confluent. Caso contrário, o usuário deverá atualizar o esquema manualmente.
- Ampliação de tipo: compatível com o Schema Registry do Confluent. Caso contrário, o usuário deverá atualizar o esquema manualmente.
- Outras alterações de tipo: Suportado pelo Registro de Esquema do Confluent. Caso contrário, o usuário deverá atualizar o esquema manualmente.
from_csv e from_xml analisadores
Os analisadores from_csv e from_xml não dão suporte à evolução do esquema.
- Novas colunas: sem suporte
- Renomeação de coluna: Não suportado
- Colunas descartadas: sem suporte
- Ampliação de tipo: sem suporte
- Outras alterações de tipo: sem suporte
Suporte à evolução do esquema por mecanismo
Streaming estruturado
O esquema de uma consulta de streaming é definido durante a etapa de planejamento, e todos os microlotes reutilizam esse plano sem replanejamento. Se o esquema de origem mudar no meio da execução, a consulta falhará e o usuário deverá reiniciar a consulta de streaming para que o Spark possa planejar novamente o novo esquema.
O conjunto de dados em que o fluxo escreve também deve dar suporte à evolução do esquema.
- Novas colunas: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Colunas removidas: suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Ampliação de tipo: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Outras alterações de tipo: suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
Evolução do esquema por conjunto de dados
Tabelas de streaming
As tabelas de streaming dão suporte ao comportamento de evolução do esquema de mesclagem por padrão. Atualizar o esquema não requer uma reinicialização manual, mas alterações arbitrárias de esquema exigem uma atualização completa.
- Novas colunas: Suportado. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta é reiniciada para resolver a incompatibilidade de esquema. A coluna renomeada é tratada como uma nova coluna adicionada.
- Colunas removidas: suportado. As colunas descartadas são tratadas como exclusões suaves, em que novas linhas para a coluna excluída são definidas como NULL.
- Ampliação de tipo: Suportado. A ampliação de tipo deve ser habilitada ao nível do pipeline ou diretamente na tabela. Consulte a expansão de tipo em Pipelines Declarativos do Lakeflow Spark.
- Outras alterações de tipo: sem suporte. Atualizar o esquema requer uma atualização completa.
Visões materializadas
Qualquer atualização do esquema ou da consulta de definição dispara uma recomputação completa da exibição materializada.
- Novas colunas: Recomputação completa iniciada.
- Coluna renomeada: Recálculo completo acionado.
- Colunas descartadas: Recalculo completo disparado.
- Ampliação de tipo: recálculo completo iniciado.
- Outras alterações de tipo: recálculo completo iniciado.
Tabelas delta
As tabelas Delta dão suporte a uma variedade de configurações para atualizar o esquema de tabela, incluindo renomeação, remoção e ampliação do tipo de colunas sem reescrever dados da tabela. As configurações com suporte incluem evolução do esquema de mesclagem, mapeamento de coluna, ampliação de tipo e substituição do esquema.
- Novas colunas: Suportado. Evolui automaticamente quando a evolução do esquema de mesclagem é habilitada, sem a necessidade de uma reescrita de tabela Delta. Se a evolução do esquema de mesclagem não estiver habilitada, as atualizações falharão.
-
Renomeação de coluna: Suportada. Com o mapeamento de coluna habilitado, pode renomear por meio de comandos manuais
ALTER TABLE DDL. Não requer uma reescrita de tabela Delta. -
Colunas removidas: suportado. Pode eliminar colunas por meio de comandos manuais
ALTER TABLE DDLcom o mapeamento de coluna habilitado. Não requer uma reescrita de tabela Delta. -
Ampliação de tipo: Suportado. Aplica automaticamente a alteração de tipo quando a evolução do esquema de mesclagem e ampliação de tipo é habilitada. Você pode ampliar colunas por meio de comandos manuais
ALTER TABLE DDLquando a ampliação de tipo estiver habilitada. Sem nenhuma das operações configuradas, as operações falham. Consulte Expandir tipos com evolução automática de esquema. -
Outras alterações de tipo: com suporte, mas requer uma reescrita completa da Tabela Delta. Você deve habilitar
overwriteSchema, o que permite uma reescrita completa da Tabela Delta. Caso contrário, as operações falharão.
Visões
Se o modo de exibição tiver um column_list que não está em conformidade com o novo esquema, ou tiver uma consulta que não possa ser analisada, o modo de exibição se tornará inválido. Caso contrário, você poderá habilitar a evolução do esquema para alterações de tipo com SCHEMA TYPE EVOLUTION e para alterações de tipo, bem como colunas novas, renomeadas e descartadas com SCHEMA EVOLUTION (que é um superconjunto de evolução de tipo).
-
Novas colunas: Suportado. Com o modo
SCHEMA EVOLUTION, o modo de exibição evolui automaticamente sem intervenção manual se não houvercolumn_listexplícito. Caso contrário, a visualização pode se tornar inválida e o usuário não conseguirá consultá-la. -
Renomeação de colunas: Suportado. Com o modo
SCHEMA EVOLUTION, o modo de exibição evolui automaticamente sem intervenção manual se não houvercolumn_listexplícito. Caso contrário, a visão pode se tornar inválida. -
Colunas removidas: suportado. Com o modo
SCHEMA EVOLUTION, o modo de exibição evolui automaticamente sem intervenção manual se não houvercolumn_listexplícito. Caso contrário, a visão pode se tornar inválida. -
Ampliação de tipo: Suportado. No modo
SCHEMA TYPE EVOLUTION, a exibição evolui automaticamente para qualquer mudança de tipo. Com o modoSCHEMA EVOLUTION, o modo de exibição evolui automaticamente sem intervenção manual se não houvercolumn_listexplícito. Caso contrário, a visão pode se tornar inválida. -
Outras alterações de tipo: suportado. No modo
SCHEMA TYPE EVOLUTION, a exibição evolui automaticamente para qualquer mudança de tipo. Com o modoSCHEMA EVOLUTION, o modo de exibição evolui automaticamente sem intervenção manual se não houvercolumn_listexplícito. Caso contrário, a visão pode se tornar inválida.
Example
O exemplo a seguir mostra como ingerir um tópico do Kafka com cargas codificadas em Avro registradas no Registro de Esquema do Confluent e gravá-las em uma tabela Delta gerenciada com a evolução do esquema habilitada.
Pontos-chave ilustrados:
- Integre com o Conector Kafka.
- Decodificar registros Avro usando from_avro com o Registro de Esquema do Kafka.
- Manipular a evolução do esquema definindo
avroSchemaEvolutionMode. - Escreva em uma tabela Delta com
mergeSchemahabilitado para permitir alterações aditivas.
O código pressupõe que você tenha um tópico Kafka usando o Confluent Schema Registry, gerando dados codificados em Avro.
# ----- CONFIG: fill these in -----
# Catalog and schema:
CATALOG = "<catalog_name>"
SCHEMA = "<schema_name>"
# Schema Registry:
# (This is where the producer evolves the schema)
SCHEMA_REG = "<schema registry endpoint>"
SR_USER = "<api key>"
SR_PASS = "<api secret>"
# Confluent Cloud: SASL_SSL broker:
BOOTSTRAP = "<server:ip>"
# Kafka topic:
TOPIC = "<topic>"
# ----- end: config -----
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_users"
CHECKPOINT = f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/bronze_users"
# Kafka auth (example for Confluent Cloud SASL/PLAIN over SSL)
KAFKA_OPTS = {
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config": f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{SR_USER}' password='{SR_PASS}';"
}
# ----- Evolution knobs -----
# spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = True)
from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro
# Build reader
reader = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
)
# Attach Kafka auth options
for k, v in KAFKA_OPTS.items():
reader = reader.option(k, v)
# --- No native schema evolution supported. Returns a binary blob. ---
raw_df = reader.load()
# Decode Avro with Schema Registry
# --- The format parser handles updating the schema using the schema registry ---
decoded = from_avro(
data=col("value"),
jsonFormatSchema=None, # using SR
subject=f"{TOPIC}-value",
schemaRegistryAddress=SCHEMA_REG,
options={
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{SR_USER}:{SR_PASS}",
# Behavior on schema changes:
"avroSchemaEvolutionMode": "restart", # fail-fast so you can restart and adopt new fields
"mode": "FAILFAST"
}
).alias("payload")
bronze_df = raw_df.select(decoded, "timestamp").select("payload.*", "timestamp")
# Write to a managed Delta table as a STREAM
# --- Need to enable schema evolution separately for streaming to a Delta separately with mergeSchema --
(bronze_df.writeStream
.format("delta")
.option("checkpointLocation", CHECKPOINT)
.option("ignoreChanges", "true")
.outputMode("append")
.option("mergeSchema", "true") # only supports adding new columns. Renaming, dropping, and type changes need to be handled separately.
.trigger(availableNow=True) # Use availableNow trigger for Databricks SQL/Unity Catalog
.toTable(BRONZE_TABLE)
)