Schema-evolutie in Azure Databricks

Schemaontwikkeling verwijst naar de mogelijkheid van een systeem om zich aan te passen aan wijzigingen in de structuur van gegevens in de loop van de tijd. Deze wijzigingen zijn gebruikelijk bij het werken met semi-gestructureerde gegevens, gebeurtenisstromen of bronnen van derden waar nieuwe velden worden toegevoegd, gegevenstypen worden verplaatst of geneste structuren zich ontwikkelen.

Veelvoorkomende wijzigingen zijn:

  • Nieuwe kolommen: Aanvullende velden die niet eerder zijn gedefinieerd, soms met een aangepaste backfillwaarde.
  • Kolomnaam wijzigen: een kolomnaam wijzigen, bijvoorbeeld van name naar full_name.
  • Verwijderde kolommen: Kolommen verwijderen uit het tabelschema.
  • Type breder maken: het type van een kolom wijzigen in een breder type. Een bijvoorbeeld INT veld wordt DOUBLE.
  • Andere typewijzigingen: het type van een kolom wijzigen. Een bijvoorbeeld INT veld wordt STRING.

Het ondersteunen van de ontwikkeling van schema's is essentieel voor het bouwen van robuuste, langlopende pijplijnen die geschikt zijn voor het wijzigen van gegevens zonder regelmatige handmatige updates.

Components

De ontwikkeling van azure Databricks-schema's omvat vier hoofdonderdeelcategorieën, waarbij schemawijzigingen onafhankelijk worden verwerkt:

  1. Connectors: onderdelen die gegevens uit externe bronnen opnemen. Dit zijn onder andere Auto Loader, Kafka-, Kinesis- en Lakeflow-connectors.
  2. Formaatparsers: Functies die onbewerkte indelingen decoderen, waaronder from_json, from_avro, from_xml, en from_protobuf.
  3. Engines: Verwerkingsengines die query's uitvoeren, waaronder Structured Streaming.
  4. Gegevenssets: Streamingtabellen, gematerialiseerde weergaven, Delta-tabellen en weergaven die gegevens opslaan en verwerken.

Ontwikkeling van schema's

Elk onderdeel in de ontwikkeling van het schema van de data engineering-architectuur is onafhankelijk. U bent verantwoordelijk voor het configureren van de ontwikkeling van schema's in afzonderlijke onderdelen om het gewenste gedrag in uw gegevensverwerkingsstroom te bereiken.

Wanneer u bijvoorbeeld automatisch laden gebruikt om gegevens op te nemen in een Delta-tabel, zijn er twee persistente schema's: een wordt beheerd door Auto Loader op de schemalocatie en de andere is het schema van de delta-doeltabel. In een stabiele toestand zijn die twee hetzelfde. Wanneer automatisch laadprogramma het schema ontwikkelt, op basis van binnenkomende gegevens, moet de Delta-tabel ook het schema ontwikkelen of mislukt de query. In dat geval kunt u (a) het schema van de delta-doeltabel bijwerken door schemaontwikkeling in te schakelen of door een directe DDL-opdracht te gebruiken, of (b) de doel-Delta-tabel volledig herschrijven.

Ondersteuning voor schemaontwikkeling per connector

In de volgende secties wordt beschreven hoe elk Azure Databricks-onderdeel verschillende typen schemawijzigingen verwerkt.

Automatische lader

AutoLoader ondersteunt kolomwijzigingen en typebreiding. Configureer automatische ontwikkeling van schema's met cloudFiles.schemaEvolutionMode en rescuedDataColumn. U kunt handmatig schemaHints instellen of een onveranderbare schema instellen. Bij het automatisch ontwikkelen van het schema mislukt de stream in eerste instantie. Bij het opnieuw opstarten wordt het ontwikkelde schema gebruikt. Zie Hoe werkt de evolutie van het Auto Loader-schema?

  • Nieuwe kolommen: Ondersteund, afhankelijk van de schemaEvolutionMode geselecteerde. Mislukt met handmatig opnieuw opstarten vereist om nieuwe kolommen toe te voegen aan het schema.
  • Kolomnaam wijzigen: Ondersteund, afhankelijk van de schemaEvolutionMode geselecteerde. De hernoemde kolom wordt behandeld als een nieuwe kolom die is toegevoegd, en de oude kolom wordt gevuld met NULL voor nieuwe rijen. Er is een fout opgetreden en een handmatige herstart is vereist om het schema bij te werken.
  • Verwijderde kolommen: ondersteund. Behandeld als 'soft deletes', waarbij voor de verwijderde kolom nieuwe rijen worden ingesteld op NULL.
  • Type widening: ondersteund in Databricks Runtime 16.4 en hoger met schemaEvolutionMode ingesteld op addNewColumnsWithTypeWidening. Ondersteunde wijzigingen in gegevenstypen worden automatisch uitgebreid. Niet-ondersteunde typewijzigingen worden vastgelegd in de rescuedDataColumn. Zie Automatische typeverbreding met Auto Loader.
  • Andere typewijzigingen: niet ondersteund. Typewijzigingen worden vastgelegd in de rescuedDataColumn als rescueDataColumn is ingesteld en schemaEvolutionMode is ingesteld op rescue. Anders is een handmatige schemawijziging vereist.

Deltaconnector

De Delta-connector kan de ontwikkeling van schema's ondersteunen. Als het lezen vanuit een Delta-tabel met kolomtoewijzing en schemaTrackingLocation is ingeschakeld, ondersteunt deze de ontwikkeling van schema's voor kolomhernoeming en verwijderde kolommen. U moet de juiste Spark-configuratie instellen voor elk van deze respectieve wijzigingen om het schema te ontwikkelen zonder de stream te stoppen. Anders evolveert de stroom zijn bijgehouden schema wanneer er een wijziging wordt gedetecteerd en stopt vervolgens. Vervolgens moet u de streamingquery handmatig opnieuw starten om de verwerking te hervatten.

  • Nieuwe kolommen: ondersteund. Als mergeSchema deze optie is ingeschakeld, worden nieuwe kolommen automatisch toegevoegd. Anders mislukt de query en moet u de stream opnieuw starten om de nieuwe kolommen toe te voegen aan het schema, maar de Delta-tabel vereist geen herschrijfbewerking.
  • Kolomnaam wijzigen: ondersteund. Wanneer mergeSchema is ingeschakeld, wordt het hernoemen automatisch afgehandeld. Anders kunt u een schema ontwikkelen binnen een streamingquery met de Spark-configuratie spark.databricks.delta.streaming.allowSourceColumnRename.
  • Verwijderde kolommen: ondersteund. Als mergeSchema deze optie is ingeschakeld, worden verwijderde kolommen automatisch verwerkt. Anders kunt u een schema ontwikkelen binnen een streamingquery met de Spark-configuratie spark.databricks.delta.streaming.allowSourceColumnDrop.
  • Type widening: ondersteund in Databricks Runtime 16.4 LTS en hoger. Met mergeSchema ingeschakeld en typeverbreding ingeschakeld op de doeltabel, worden typewijzigingen automatisch verwerkt. U kunt type widening inschakelen met de tabeleigenschap type widening.
  • Andere typewijzigingen: niet ondersteund.

SaaS- en CDC-connectors

De SaaS- en CDC-connectors ontwikkelen het schema automatisch wanneer kolommen worden gewijzigd. Dit wordt afgehandeld via een automatische herstart wanneer een wijziging wordt gedetecteerd. Voor typewijzigingen is een volledige vernieuwing vereist.

  • Nieuwe kolommen: ondersteund. De query wordt automatisch opnieuw opgestart om schemaovereenkomsten op te lossen.
  • Kolomnaam wijzigen: ondersteund. De query wordt automatisch opnieuw opgestart om schemaovereenkomsten op te lossen. De hernoemde kolom wordt behandeld als een nieuwe kolom toegevoegd.
  • Verwijderde kolommen: ondersteund. Verwijderde kolommen worden behandeld als soft deletes, waarbij voor nieuwe rijen de verwijderde kolom is ingesteld op NULL.
  • Type breder maken: niet ondersteund. Voor het bijwerken van het schema is een volledige vernieuwing vereist.
  • Andere typewijzigingen: niet ondersteund. Voor het bijwerken van het schema is een volledige vernieuwing vereist.

Kinesis, Kafka, Pub/Sub en Pulsar connectors

Er wordt geen systeemeigen schemaontwikkeling ondersteund. Elk van de connectorfuncties retourneert een binaire blob. Schema-evolutie wordt verwerkt door de formaatparser.

  • Nieuwe kolommen: verwerkt door de opmaakparser.
  • Kolomnaam wijzigen: verwerkt door de notatieparser.
  • Verwijderde kolommen: verwerkt door de opmaakparser.
  • Type widening: Verwerkt door de formaatparser.
  • Andere typewijzigingen: verwerkt door de notatieparser.

Ondersteuning voor schemaontwikkeling door opmaakparser

from_json Parser

De from_json parser biedt geen ondersteuning voor de ontwikkeling van schema's. U moet het schema handmatig bijwerken. Wanneer u declaratieve pijplijnen van Lakeflow Spark gebruikt from_json , kan automatische schemaontwikkeling worden ingeschakeld met schemaLocationKey en schemaEvolutionMode.

  • Nieuwe kolommen: wanneer automatische schemaontwikkeling is ingeschakeld, gedraagt het zich als Automatisch laden.
  • Kolomhernoeming: wanneer automatische schemaontwikkeling is ingeschakeld, gedraagt het zich als Auto Loader.
  • Verwijderde kolommen: wanneer automatische schema-evolutie is ingeschakeld, gedraagt het zich als Auto Loader.
  • Type widening: Wanneer automatische schemaontwikkeling is ingeschakeld, gedraagt het zich als Auto Loader.
  • Andere typewijzigingen: wanneer automatische schemaontwikkeling is ingeschakeld, gedraagt het zich als Automatisch laden.

from_avro en from_protobuf parsers

De from_avro en from_protobuf parsers gedragen zich op dezelfde manier. Het schema kan worden opgehaald uit het Confluent-schemaregister of de gebruiker kan een schema opgeven en moet het schema handmatig bijwerken. Er is geen concept van schemaontwikkeling binnen de from_avro of from_protobuf functie; deze moet worden verwerkt door de uitvoeringsengine en het schemaregister.

  • Nieuwe kolommen: Ondersteund met Confluent Schema Registry. Anders moet de gebruiker het schema handmatig bijwerken.
  • Kolomnaam wijzigen: ondersteund met Confluent Schema Registry. Anders moet de gebruiker het schema handmatig bijwerken.
  • Verwijderde kolommen: ondersteund met Confluent Schema Registry. Anders moet de gebruiker het schema handmatig bijwerken.
  • Type widening: ondersteund met Confluent Schema Registry. Anders moet de gebruiker het schema handmatig bijwerken.
  • Andere typewijzigingen: ondersteund met Confluent Schema Registry. Anders moet de gebruiker het schema handmatig bijwerken.

from_csv en from_xml parsers

De from_csv en from_xml parsers bieden geen ondersteuning voor schema-evolutie.

  • Nieuwe kolommen: niet ondersteund
  • Kolomnaam wijzigen: niet ondersteund
  • Verwijderde kolommen: niet ondersteund
  • Type breder maken: niet ondersteund
  • Andere typewijzigingen: niet ondersteund

Ondersteuning voor schemaontwikkeling per engine

Gestructureerd streamen

Het schema van een streamingquery is vergrendeld tijdens de planningsfase en alle microbatches hergebruiken dat plan zonder opnieuw te plannen. Als het bronschema halverwege de uitvoering verandert, mislukt de query en moet de gebruiker de streamingquery opnieuw starten, zodat Spark het nieuwe schema opnieuw kan plannen.

De gegevensset waarnaar de stroom schrijft, moet ook ondersteuning bieden voor de ontwikkeling van schema's.

  • Nieuwe kolommen: ondersteund. De query mislukt en u moet de stream opnieuw starten om het schema-mismatch op te lossen.
  • Kolomnaam wijzigen: ondersteund. De query mislukt en u moet de stream opnieuw starten om het schema-mismatch op te lossen.
  • Verwijderde kolommen: ondersteund. De query mislukt en u moet de stream opnieuw starten om het schema-mismatch op te lossen.
  • Type breder maken: ondersteund. De query mislukt en u moet de stream opnieuw starten om het schema-mismatch op te lossen.
  • Andere typewijzigingen: ondersteund. De query mislukt en u moet de stream opnieuw starten om het schema-mismatch op te lossen.

Ontwikkeling van schema's per gegevensset

Streamingtabellen

Streamingtabellen ondersteunen standaard samenvoegschema-evolutie gedrag. Het bijwerken van het schema vereist geen handmatig opnieuw opstarten, maar willekeurige schemawijzigingen vereisen een volledige vernieuwing.

  • Nieuwe kolommen: ondersteund. De query wordt automatisch opnieuw opgestart om de schema mismatch op te lossen.
  • Kolomnaam wijzigen: ondersteund. De query wordt opnieuw opgestart om de schemascheefstand op te lossen. De hernoemde kolom wordt behandeld als een nieuwe kolom toegevoegd.
  • Verwijderde kolommen: ondersteund. Verwijderde kolommen worden behandeld als zachte verwijdering, waarbij nieuwe rijen in de verwijderde kolom worden ingesteld op NULL.
  • Type breder maken: ondersteund. Typebreiding moet worden ingeschakeld op het niveau van de pipeline of direct op de tabel. Zie type-verbredding in Lakeflow Spark Declarative Pipelines.
  • Andere typewijzigingen: niet ondersteund. Voor het bijwerken van het schema is een volledige vernieuwing vereist.

Gematerealiseerde weergaven

Elke update van het schema of de definitiequery activeert een volledige hercomputing van de gerealiseerde weergave.

  • Nieuwe kolommen: Volledig opnieuw compileren geactiveerd.
  • Kolom hernoemen: volledige herberekening geactiveerd.
  • Verwijderde kolommen: volledig opnieuw compileren geactiveerd.
  • Type-verbreding: Volledige herberekening geactiveerd.
  • Andere typewijzigingen: volledig opnieuw compileren geactiveerd.

Delta-tabellen

Delta-tabellen ondersteunen verschillende configuraties om het tabelschema bij te werken, waaronder het wijzigen van de naam, het verwijderen en breder maken van het type kolommen zonder tabelgegevens opnieuw te schrijven. Ondersteunde configuraties zijn onder andere ontwikkeling van samenvoegschema's, kolomtoewijzing, type widening en overwriteSchema.

  • Nieuwe kolommen: ondersteund. Wordt automatisch ontwikkeld wanneer de ontwikkeling van het samenvoegschema is ingeschakeld, zonder dat een Delta-tabel opnieuw hoeft te worden geschreven. Als de ontwikkeling van het samenvoegschema niet is ingeschakeld, mislukken updates.
  • Kolomnaam wijzigen: ondersteund. Kan de naam wijzigen via handmatige ALTER TABLE DDL opdrachten waarvoor kolomtoewijzing is ingeschakeld. Er is geen herschrijf van een Delta-tabel vereist.
  • Verwijderde kolommen: ondersteund. Je kunt kolommen verwijderen via manuele ALTER TABLE DDL opdrachten met ingeschakelde kolomtoewijzing. Er is geen herschrijf van een Delta-tabel vereist.
  • Type breder maken: ondersteund. De typewijziging wordt automatisch toegepast wanneer typevergroting en schema-evolutie voor samenvoegen zijn ingeschakeld. U kunt kolommen breder maken via handmatige ALTER TABLE DDL opdrachten wanneer het type widening is ingeschakeld. Als geen van beide is geconfigureerd, mislukken de bewerkingen. Zie Widen-typen met automatische schemaontwikkeling.
  • Andere typewijzigingen: ondersteund, maar vereist een volledig herschrijven van de Delta-tabel. U moet inschakelen overwriteSchema, waardoor de Delta-tabel volledig kan worden herschreven. Anders mislukken bewerkingen.

Views

Als de weergave niet column_list overeenkomt met het nieuwe schema of als deze een query bevat die niet kan worden geparseerd, wordt de weergave ongeldig. Als dit niet het probleem is, kunt u de evolutie van het schema inschakelen voor typewijzigingen met SCHEMA TYPE EVOLUTION en voor typewijzigingen, evenals nieuwe, hernoemde en verwijderde kolommen met SCHEMA EVOLUTION (een superset van typeontwikkeling).

  • Nieuwe kolommen: ondersteund. Met SCHEMA EVOLUTION de modus verandert de weergave automatisch zonder handmatige tussenkomst als er geen expliciete column_listactie is. Anders kan de weergave ongeldig worden en kan de gebruiker er geen query's op uitvoeren.
  • Kolommen hernoemen: ondersteund. Met SCHEMA EVOLUTION de modus verandert de weergave automatisch zonder handmatige tussenkomst als er geen expliciete column_listactie is. Anders kan de weergave ongeldig worden.
  • Verwijderde kolommen: ondersteund. Met SCHEMA EVOLUTION de modus verandert de weergave automatisch zonder handmatige tussenkomst als er geen expliciete column_listactie is. Anders kan de weergave ongeldig worden.
  • Type breder maken: ondersteund. Met SCHEMA TYPE EVOLUTION modus evolueert de weergave automatisch voor elke typewijziging. Met SCHEMA EVOLUTION de modus verandert de weergave automatisch zonder handmatige tussenkomst als er geen expliciete column_listactie is. Anders kan de weergave ongeldig worden.
  • Andere typewijzigingen: ondersteund. Met SCHEMA TYPE EVOLUTION modus evolueert de weergave automatisch voor elke typewijziging. Met SCHEMA EVOLUTION de modus verandert de weergave automatisch zonder handmatige tussenkomst als er geen expliciete column_listactie is. Anders kan de weergave ongeldig worden.

Example

In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp kunt opnemen met Avro-gecodeerde nettoladingen die zijn geregistreerd in Confluent Schema Registry en deze naar een beheerde Delta-tabel schrijft waarvoor schemaontwikkeling is ingeschakeld.

Belangrijke punten geïllustreerd:

  • Integreer met de Kafka-connector.
  • Avro-records decoderen met behulp van from_avro met een Kafka-schemaregister.
  • De evolutie van het schema afhandelen door het instellen avroSchemaEvolutionMode.
  • Schrijf naar een Delta-tabel met mergeSchema ingeschakeld om additieve wijzigingen toe te staan.

In de code wordt ervan uitgegaan dat u een Kafka-onderwerp hebt met behulp van het Confluent-schemaregister, waarbij avro-gecodeerde gegevens worden uitgevoerd.

# ----- CONFIG: fill these in -----
# Catalog and schema:
CATALOG = "<catalog_name>"
SCHEMA = "<schema_name>"
# Schema Registry:
# (This is where the producer evolves the schema)
SCHEMA_REG = "<schema registry endpoint>"
SR_USER    = "<api key>"
SR_PASS    = "<api secret>"
# Confluent Cloud: SASL_SSL broker:
BOOTSTRAP = "<server:ip>"
# Kafka topic:
TOPIC = "<topic>"
# ----- end: config -----

BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_users"
CHECKPOINT = f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/bronze_users"

# Kafka auth (example for Confluent Cloud SASL/PLAIN over SSL)
KAFKA_OPTS = {
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config": f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{SR_USER}' password='{SR_PASS}';"
}

# ----- Evolution knobs -----
# spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = True)

from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro

# Build reader
reader = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", BOOTSTRAP)
  .option("subscribe", TOPIC)
  .option("startingOffsets", "earliest")
)

# Attach Kafka auth options
for k, v in KAFKA_OPTS.items():
    reader = reader.option(k, v)

# --- No native schema evolution supported. Returns a binary blob. ---
raw_df = reader.load()

# Decode Avro with Schema Registry
# --- The format parser handles updating the schema using the schema registry ---
decoded = from_avro(
    data=col("value"),
    jsonFormatSchema=None, # using SR
    subject=f"{TOPIC}-value",
    schemaRegistryAddress=SCHEMA_REG,
    options={
      "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info": f"{SR_USER}:{SR_PASS}",
      # Behavior on schema changes:
      "avroSchemaEvolutionMode": "restart", # fail-fast so you can restart and adopt new fields
      "mode": "FAILFAST"
    }
).alias("payload")

bronze_df = raw_df.select(decoded, "timestamp").select("payload.*", "timestamp")

# Write to a managed Delta table as a STREAM
# --- Need to enable schema evolution separately for streaming to a Delta separately with mergeSchema --
(bronze_df.writeStream
  .format("delta")
  .option("checkpointLocation", CHECKPOINT)
  .option("ignoreChanges", "true")
  .outputMode("append")
  .option("mergeSchema", "true") # only supports adding new columns. Renaming, dropping, and type changes need to be handled separately.
  .trigger(availableNow=True) # Use availableNow trigger for Databricks SQL/Unity Catalog
  .toTable(BRONZE_TABLE)
)