Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
A evolução do esquema refere-se à capacidade de um sistema de se adaptar às mudanças na estrutura dos dados ao longo do tempo. Essas alterações são comuns ao trabalhar com dados semiestruturados, fluxos de eventos ou fontes de terceiros onde novos campos são adicionados, tipos de dados mudam ou estruturas aninhadas evoluem.
As alterações comuns incluem:
- Novas colunas: campos adicionais não definidos anteriormente, às vezes com um valor de preenchimento personalizado.
-
Renomeação de coluna: alterar o nome de uma coluna, por exemplo, de
nameparafull_name. - Colunas soltas: removendo colunas do esquema da tabela.
-
Alargamento de tipo: Alterar o tipo de uma coluna para um tipo mais amplo. Por exemplo, um
INTcampo que se tornaDOUBLE. -
Outras alterações de tipo: alterar o tipo de uma coluna. Por exemplo, um
INTcampo que se tornaSTRING.
O suporte à evolução do esquema é fundamental para a criação de pipelines resilientes e de longa execução que podem acomodar dados variáveis sem atualizações manuais frequentes.
Components
A evolução do esquema do Azure Databricks envolve quatro categorias de componentes principais, cada uma manipulando alterações de esquema de forma independente:
- Conectores: componentes que ingerem dados de fontes externas. Estes incluem conectores Auto Loader, Kafka, Kinesis e Lakeflow.
-
Analisadores de formato: funções que decodificam formatos brutos, incluindo
from_json,from_avro,from_xmle .from_protobuf - Mecanismos: mecanismos de processamento que executam consultas, incluindo Streaming Estruturado.
- Conjuntos de dados: tabelas de streaming, exibições materializadas, tabelas Delta e exibições que persistem e servem dados.
Cada componente na evolução do esquema da arquitetura de engenharia de dados é independente. Você é responsável por configurar a evolução do esquema em componentes individuais para alcançar o comportamento desejado em seu fluxo de processamento de dados.
Por exemplo, ao usar o Auto Loader para ingerir dados em uma tabela Delta, há dois esquemas persistentes — um é gerenciado pelo Auto Loader em seu local de esquema e o outro é o esquema da tabela Delta de destino. Em um estado estável, esses dois são os mesmos. Quando o Auto Loader evolui seu esquema, com base nos dados recebidos, a tabela Delta também deve evoluir seu esquema ou a consulta falhará. Nesse caso, você pode (a) atualizar o esquema da tabela Delta de destino habilitando a evolução do esquema ou usando um comando DDL direto, ou (b) fazer uma reescrita completa da tabela Delta de destino.
Suporte à evolução do esquema por conector
As seções a seguir detalham como cada componente do Azure Databricks lida com diferentes tipos de alterações de esquema.
Carregador Automático
O Auto Loader suporta alterações de coluna, mas não alterações de tipo. Configure a evolução automática do esquema com cloudFiles.schemaEvolutionMode e rescuedDataColumn. Você pode definir schemaHints manualmente ou um imutável schema. Ao evoluir o esquema automaticamente, o fluxo inicialmente falha. Na reinicialização, o esquema evoluído é usado. Consulte Como funciona a evolução do esquema do Auto Loader?.
-
Novas colunas: Suportado, dependendo do
schemaEvolutionModeselecionado. Falha com uma reinicialização manual necessária para adicionar novas colunas ao esquema. -
Renomeação de coluna: suportada, dependendo da opção selecionada em
schemaEvolutionMode. A coluna renomeada é tratada como uma nova coluna adicionada e a coluna antiga é preenchida comNULLpara novas linhas. Falha com uma reinicialização manual necessária para atualizar o esquema. -
Colunas removidas: Suportado. Tratadas como exclusões suaves, onde novas linhas para a coluna excluída são definidas como
NULL. -
Alargamento de tipos: Não é suportado. As alterações de tipo são capturadas no
rescuedDataColumnserescueDataColumntiver sido definido eschemaEvolutionModetiver sido definido comorescue. Caso contrário, requer uma alteração manual de esquema. -
Outras alterações de tipo: Não suportado. As alterações de tipo são capturadas no
rescuedDataColumnserescueDataColumntiver sido definido eschemaEvolutionModetiver sido definido comorescue. Caso contrário, requer uma alteração manual de esquema.
Conector delta
O conector Delta pode suportar a evolução do esquema. Se estiver a ler a partir de uma tabela Delta com mapeamento de colunas e schemaTrackingLocation ativados, suporta evolução de esquema para renomeação de colunas e colunas eliminadas. Você deve definir a configuração correta do Spark para cada uma dessas respetivas alterações para evoluir o esquema sem interromper o fluxo. Caso contrário, o fluxo evolui seu esquema rastreado sempre que uma alteração é detetada e, em seguida, para. Em seguida, reinicie manualmente a consulta de streaming para retomar o processamento.
-
Novas colunas: Suportado. Com ativado
mergeSchema, novas colunas são adicionadas automaticamente. Caso contrário, a consulta falha e tens de reiniciar o fluxo para adicionar as novas colunas ao esquema, mas a tabela Delta não requer uma reescrita. -
Renomeação de coluna: Suportada. Com ativado
mergeSchema, a renomeação é feita automaticamente. Caso contrário, pode evoluir o esquema dentro de uma consulta de streaming com a configuração do Sparkspark.databricks.delta.streaming.allowSourceColumnRename. -
Colunas removidas: Suportado. Com
mergeSchemaativado, as colunas eliminadas são tratadas automaticamente. Caso contrário, pode evoluir um esquema numa consulta de streaming com a configuraçãospark.databricks.delta.streaming.allowSourceColumnDropdo Spark. -
Alargamento de tipos: Suportado em Databricks Runtime 16.4 LTS e superiores. Com
mergeSchemaativado e alargamento de tipos ativados na tabela de destino, as alterações de tipo são geridas automaticamente. Você pode habilitar a ampliação de tipo com atype wideningpropriedade table. - Outras alterações de tipo: Não suportado.
Conectores SaaS e CDC
Os conectores SaaS e CDC evoluem o esquema automaticamente quando as colunas mudam. Isso é tratado por meio de uma reinicialização automática quando uma alteração é detetada. As alterações de tipo exigem uma atualização completa.
- Novas colunas: Suportado. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema. A coluna renomeada é tratada como uma nova coluna adicionada.
-
Colunas removidas: Suportado. As colunas descartadas são tratadas como exclusões suaves, onde novas linhas para a coluna excluída são definidas como
NULL. - Alargamento de tipos: Não é suportado. A atualização do esquema requer uma atualização completa.
- Outras alterações de tipo: Não suportado. A atualização do esquema requer uma atualização completa.
Conectores Kinesis, Kafka, Pub/Sub e Pulsar
Nenhuma evolução de esquema nativo suportada. Cada uma das funções do conector retorna um blob binário. A evolução do esquema é tratada pelo analisador de formato.
- Novas colunas: manipuladas pelo analisador de formato.
- Renomeação de coluna: manipulada pelo analisador de formato.
- Colunas soltas: manipuladas pelo analisador de formato.
- Alargamento de tipo: manipulado pelo analisador de formato.
- Outras alterações de tipo: manipuladas pelo analisador de formato.
Suporte à evolução do esquema pelo analisador de formato
from_json analisador
O from_json analisador não suporta a evolução do esquema. Você deve atualizar o esquema manualmente. Ao usar from_json dentro do Lakeflow Spark Declarative Pipelines, a evolução automática do esquema pode ser habilitada com schemaLocationKey e schemaEvolutionMode.
- Novas colunas: Quando a evolução automática do esquema é ativada, ela se comporta como o Carregador Automático.
- Renomeação de colunas: Quando a evolução automática do esquema é ativada, ela se comporta como o Carregador Automático.
- Colunas soltas: Quando a evolução automática do esquema é ativada, ela se comporta como o Carregador Automático.
- Alargamento de tipo: Quando a evolução automática do esquema está ativada, ela se comporta como o Carregador Automático.
- Outras alterações de tipo: Quando a evolução automática do esquema é ativada, ela se comporta como o Carregador Automático.
from_avro e from_protobuf analisadores
Os analisadores from_avro e from_protobuf se comportam de forma idêntica. O esquema pode ser buscado no Registro de Esquema Confluente ou o usuário pode fornecer um esquema e deve atualizá-lo manualmente. Não existe o conceito de evolução de esquema dentro da função from_avro ou da função from_protobuf; isso deve ser tratado pelo mecanismo de execução e pelo Registro de Esquema.
- Novas colunas: Suportadas com o Confluent Schema Registry. Caso contrário, o usuário deve atualizar o esquema manualmente.
- Renomeação de coluna: Suportado pelo Confluent Schema Registry. Caso contrário, o usuário deve atualizar o esquema manualmente.
- Colunas descartadas: Suportado com o Registro de Esquema Confluente. Caso contrário, o usuário deve atualizar o esquema manualmente.
- Ampliação de tipo: Suportado com o Registro de Esquema Confluente. Caso contrário, o usuário deve atualizar o esquema manualmente.
- Outras alterações de tipo: Suportado com o Confluent Schema Registry. Caso contrário, o usuário deve atualizar o esquema manualmente.
from_csv e from_xml analisadores
Os analisadores from_csv e from_xml não suportam a evolução do esquema.
- Novas colunas: Não suportado
- Renomear coluna: Não suportado
- Colunas eliminadas: Não suportado
- Ampliação de tipo: Não suportado
- Outras alterações de tipo: Não suportado
Suporte à evolução do esquema por mecanismo
Transmissão em Fluxo Estruturada
O esquema de uma consulta de streaming é definido durante a fase de planeamento, e todos os micro-batches reutilizam esse plano sem necessidade de replano. Se o esquema de origem for alterado no meio da execução, a consulta falhará e o usuário deverá reiniciar a consulta de streaming para que o Spark possa planejar novamente o novo esquema.
O conjunto de dados no qual o fluxo grava também deve oferecer suporte à evolução do esquema.
- Novas colunas: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Colunas removidas: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Alargamento de tipo: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
- Outras alterações de tipo: Suportado. A consulta falha e você deve reiniciar o fluxo para resolver a incompatibilidade de esquema.
Evolução do esquema por conjunto de dados
Tabelas de streaming
As tabelas de streaming suportam o comportamento de evolução do esquema de mesclagem por padrão. A atualização do esquema não requer uma reinicialização manual, mas alterações arbitrárias no esquema exigem uma atualização completa.
- Novas colunas: Suportado. A consulta é reiniciada automaticamente para resolver a incompatibilidade de esquema.
- Renomeação de coluna: Suportada. A consulta é reiniciada para resolver a incompatibilidade de esquema. A coluna renomeada é tratada como uma nova coluna adicionada.
- Colunas removidas: Suportado. As colunas descartadas são tratadas como exclusões suaves, onde novas linhas para a coluna excluída são definidas como NULL.
- Alargamento de tipo: Suportado. O alargamento de tipos deve ser ativado ao nível do pipeline ou diretamente na mesa. Ver alargamento de tipos em Oleodutos Declarativos Lakeflow Spark.
- Outras alterações de tipo: Não suportado. A atualização do esquema requer uma atualização completa.
Visões materializadas
Qualquer atualização do esquema ou da consulta definidora dispara um recálculo completo da exibição materializada.
- Novas colunas: Recomputação completa acionada.
- Renomeação de coluna: Recomputação completa iniciada.
- Colunas descartadas: Recomputação completa acionada.
- Ampliação de tipo: Recomputação total foi ativada.
- Outras alterações de tipo: Recomputação completa acionada.
Tabelas delta
As tabelas delta oferecem suporte a uma variedade de configurações para atualizar o esquema da tabela, incluindo renomear, soltar e ampliar o tipo de colunas sem reescrever os dados da tabela. As configurações suportadas incluem evolução do esquema de mesclagem, mapeamento de coluna, ampliação de tipo e overwriteSchema.
- Novas colunas: Suportado. Evolui automaticamente quando a evolução do esquema de mesclagem está habilitada, sem exigir uma reescrita de tabela Delta. Se a evolução do esquema de mesclagem não estiver habilitada, as atualizações falharão.
-
Renomeação de coluna: Suportada. Pode renomear através de comandos manuais
ALTER TABLE DDLcom o mapeamento de colunas ativado. Não requer uma reescrita de tabela Delta. -
Colunas removidas: Suportado. Pode remover colunas utilizando comandos manuais com o mapeamento de colunas
ALTER TABLE DDLativado. Não requer uma reescrita de tabela Delta. -
Alargamento de tipo: Suportado. Aplica automaticamente a mudança de tipo quando o alargamento de tipos e a evolução do esquema de fusão estão ativados. Você pode ampliar colunas usando comandos manuais
ALTER TABLE DDLquando a expansão de tipo está ativada. Sem nenhum dos dois configurados, as operações falham. Ver Alongar tipos com evolução automática de esquemas. -
Outras alterações de tipo: suportado, mas requer uma reescrita completa da tabela delta. Você deve habilitar
overwriteSchema, o que permite a regravação completa da Tabela Delta. Caso contrário, as operações falharão.
Views
Se o modo de exibição tiver um column_list que não corresponde ao novo esquema ou tiver uma consulta que não pode ser analisada, o modo de exibição se tornará inválido. Caso contrário, você pode habilitar a evolução de esquema para alterações de tipo com SCHEMA TYPE EVOLUTION e para alterações de tipo, bem como colunas novas, renomeadas e descartadas com SCHEMA EVOLUTION (que é um superconjunto de evolução de tipo).
-
Novas colunas: Suportado. Com o modo
SCHEMA EVOLUTION, evolui automaticamente a visualização sem qualquer intervenção manual se não houver nenhum explícitocolumn_list. Caso contrário, a exibição pode se tornar inválida e o usuário não pode consultá-la. -
Renomeação de colunas: Suportado. Com o modo
SCHEMA EVOLUTION, evolui automaticamente a visualização sem qualquer intervenção manual se não houver nenhum explícitocolumn_list. Caso contrário, a exibição pode se tornar inválida. -
Colunas removidas: Suportado. Com o modo
SCHEMA EVOLUTION, evolui automaticamente a visualização sem qualquer intervenção manual se não houver nenhum explícitocolumn_list. Caso contrário, a exibição pode se tornar inválida. -
Alargamento de tipo: Suportado. No modo
SCHEMA TYPE EVOLUTION, a visualização evolui automaticamente para qualquer alteração de tipo. Com o modoSCHEMA EVOLUTION, evolui automaticamente a visualização sem qualquer intervenção manual se não houver nenhum explícitocolumn_list. Caso contrário, a exibição pode se tornar inválida. -
Outras alterações de tipo: Suportado. No modo
SCHEMA TYPE EVOLUTION, a visualização evolui automaticamente para qualquer alteração de tipo. Com o modoSCHEMA EVOLUTION, evolui automaticamente a visualização sem qualquer intervenção manual se não houver nenhum explícitocolumn_list. Caso contrário, a exibição pode se tornar inválida.
Example
O exemplo a seguir mostra como ingerir um tópico Kafka com cargas codificadas em Avro registradas no Confluent Schema Registry e gravá-las em uma tabela Delta gerenciada com a evolução do esquema habilitada.
Pontos principais ilustrados:
- Integre com o Kafka Connector.
- Decodifique registos Avro usando from_avro com um Registro de Esquema Kafka.
- Manipule a evolução do esquema definindo
avroSchemaEvolutionMode. - Escreva em uma tabela Delta com
mergeSchemaativado para permitir alterações aditivas.
O código pressupõe que você tenha um tópico Kafka usando o registro de esquema Confluent, emitindo dados codificados em Avro.
# ----- CONFIG: fill these in -----
# Catalog and schema:
CATALOG = "<catalog_name>"
SCHEMA = "<schema_name>"
# Schema Registry:
# (This is where the producer evolves the schema)
SCHEMA_REG = "<schema registry endpoint>"
SR_USER = "<api key>"
SR_PASS = "<api secret>"
# Confluent Cloud: SASL_SSL broker:
BOOTSTRAP = "<server:ip>"
# Kafka topic:
TOPIC = "<topic>"
# ----- end: config -----
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_users"
CHECKPOINT = f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/bronze_users"
# Kafka auth (example for Confluent Cloud SASL/PLAIN over SSL)
KAFKA_OPTS = {
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config": f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{SR_USER}' password='{SR_PASS}';"
}
# ----- Evolution knobs -----
# spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = True)
from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro
# Build reader
reader = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
)
# Attach Kafka auth options
for k, v in KAFKA_OPTS.items():
reader = reader.option(k, v)
# --- No native schema evolution supported. Returns a binary blob. ---
raw_df = reader.load()
# Decode Avro with Schema Registry
# --- The format parser handles updating the schema using the schema registry ---
decoded = from_avro(
data=col("value"),
jsonFormatSchema=None, # using SR
subject=f"{TOPIC}-value",
schemaRegistryAddress=SCHEMA_REG,
options={
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{SR_USER}:{SR_PASS}",
# Behavior on schema changes:
"avroSchemaEvolutionMode": "restart", # fail-fast so you can restart and adopt new fields
"mode": "FAILFAST"
}
).alias("payload")
bronze_df = raw_df.select(decoded, "timestamp").select("payload.*", "timestamp")
# Write to a managed Delta table as a STREAM
# --- Need to enable schema evolution separately for streaming to a Delta separately with mergeSchema --
(bronze_df.writeStream
.format("delta")
.option("checkpointLocation", CHECKPOINT)
.option("ignoreChanges", "true")
.outputMode("append")
.option("mergeSchema", "true") # only supports adding new columns. Renaming, dropping, and type changes need to be handled separately.
.trigger(availableNow=True) # Use availableNow trigger for Databricks SQL/Unity Catalog
.toTable(BRONZE_TABLE)
)