Delen via


Schemadeductie en ontwikkeling configureren in automatisch laadprogramma

U kunt automatisch laden configureren om het schema van geladen gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd.

AutoLoader kan ook gegevens 'redden' die onverwacht waren (bijvoorbeeld van verschillende gegevenstypen) in een JSON-blobkolom, die u later kunt openen met behulp van de semi-gestructureerde API's voor gegevenstoegang.

De volgende indelingen worden ondersteund voor schemadeductie en evolutie:

File format Ondersteunde versies
JSON Alle versies
CSV Alle versies
XML Databricks Runtime 14.3 LTS en hoger
Avro Databricks Runtime 10.4 LTS en hoger
Parquet Databricks Runtime 11.3 LTS en hoger
ORC Niet ondersteund
Text Niet van toepassing (vast schema)
Binaryfile Niet van toepassing (vast schema)

Syntaxis voor schemadeductie en evolutie

Als u een doelmap voor de optie cloudFiles.schemaLocation opgeeft, kunt u schemadeductie en ontwikkeling inschakelen. U kunt ervoor kiezen om dezelfde map te gebruiken die u voor checkpointLocation opgeeft. Als u Delta Live Tables gebruikt, beheert Azure Databricks automatisch schemalocatie en andere controlepuntgegevens.

Notitie

Als er meer dan één brongegevenslocatie in de doeltabel wordt geladen, is voor elke opnameworkload voor het automatisch laden een afzonderlijk streaming-controlepunt vereist.

In het volgende voorbeeld wordt parquet voor de cloudFiles.format. Gebruik csv, avroof json voor andere bestandsbronnen. Alle andere instellingen voor lezen en schrijven blijven hetzelfde voor het standaardgedrag voor elke indeling.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Hoe werkt deductie van het automatisch laadprogramma?

Als u het schema wilt afleiden wanneer u gegevens voor het eerst leest, worden de eerste 50 GB of 1000 bestanden die worden gedetecteerd door autolaadprogramma's gesampt, afhankelijk van welke limiet het eerst wordt overschreden. Met automatisch laden worden de schemagegevens opgeslagen in een map _schemas die is geconfigureerd cloudFiles.schemaLocation om schemawijzigingen in de invoergegevens in de loop van de tijd bij te houden.

Notitie

Als u de grootte van het gebruikte voorbeeld wilt wijzigen, kunt u de SQL-configuraties instellen:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(bytetekenreeks, bijvoorbeeld 10gb)

en

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(geheel getal)

Standaard wordt in deductie van het automatisch laadprogramma gezocht om problemen met de ontwikkeling van schema's te voorkomen als gevolg van niet-overeenkomende typen. Voor indelingen die geen gegevenstypen coderen (JSON, CSV en XML), worden alle kolommen als tekenreeksen afgeleid (inclusief geneste velden in JSON-bestanden). Voor indelingen met getypt schema (Parquet en Avro) wordt met Auto Loader een subset bestanden gesampleerd en worden de schema's van afzonderlijke bestanden samengevoegd. Dit gedrag wordt samengevat in de volgende tabel:

File format Standaardgegevenstype voor uitgestelde gegevens
JSON String
CSV String
XML String
Avro Typen die zijn gecodeerd in avro-schema
Parquet Typen die zijn gecodeerd in Parquet-schema

De Apache Spark DataFrameReader gebruikt ander gedrag voor schemadeductie, het selecteren van gegevenstypen voor kolommen in JSON-, CSV- en XML-bronnen op basis van voorbeeldgegevens. Als u dit gedrag wilt inschakelen met automatisch laden, stelt u de optie cloudFiles.inferColumnTypes in op true.

Notitie

Bij het uitstellen van het schema voor CSV-gegevens wordt ervan uitgegaan dat de bestanden headers bevatten. Als uw CSV-bestanden geen headers bevatten, geeft u de optie .option("header", "false")op. Daarnaast worden de schema's van alle bestanden in het voorbeeld samengevoegd met een globaal schema. Automatisch laden kan vervolgens elk bestand lezen op basis van de header en het CSV correct parseren.

Notitie

Wanneer een kolom verschillende gegevenstypen in twee Parquet-bestanden heeft, kiest Auto Loader het breedste type. U kunt schemaHints gebruiken om deze keuze te overschrijven. Wanneer u schemahints opgeeft, wordt de kolom niet door autolader naar het opgegeven type gecast, maar wordt de Parquet-lezer gevraagd de kolom als het opgegeven type te lezen. In het geval van een niet-overeenkomende kolom wordt de kolom in de kolom met geredde gegevens gered.

Hoe werkt de evolutie van het autoladerschema?

Auto Loader detecteert de toevoeging van nieuwe kolommen terwijl deze uw gegevens verwerkt. Wanneer automatisch laden een nieuwe kolom detecteert, stopt de stroom met een UnknownFieldException. Voordat uw stream deze fout genereert, voert Auto Loader schemadeductie uit op de meest recente microbatch met gegevens en werkt de schemalocatie bij met het nieuwste schema door nieuwe kolommen samen te voegen aan het einde van het schema. De gegevenstypen van bestaande kolommen blijven ongewijzigd.

Databricks raadt u aan om Automatisch laden te configureren met Databricks-taken om automatisch opnieuw op te starten nadat dergelijke schemawijzigingen zijn gewijzigd.

Auto Loader ondersteunt de volgende modi voor schemaontwikkeling, die u in de optie cloudFiles.schemaEvolutionModeinstelt:

Modus Gedrag bij het lezen van nieuwe kolom
addNewColumns (standaard) Stream mislukt. Er worden nieuwe kolommen aan het schema toegevoegd. Bestaande kolommen ontwikkelen geen gegevenstypen.
rescue Het schema is nooit ontwikkeld en de stroom mislukt niet vanwege schemawijzigingen. Alle nieuwe kolommen worden vastgelegd in de kolom met geredde gegevens.
failOnNewColumns Stream mislukt. Stream wordt niet opnieuw opgestart, tenzij het opgegeven schema wordt bijgewerkt of het offending-gegevensbestand wordt verwijderd.
none Het schema wordt niet aangepast, nieuwe kolommen worden genegeerd en gegevens worden niet gered, tenzij de rescuedDataColumn optie is ingesteld. Stream mislukt niet vanwege schemawijzigingen.

Hoe werken partities met automatische laadprogramma's?

Auto Loader probeert partitiekolommen af te stellen van de onderliggende mapstructuur van de gegevens als de gegevens zijn ingedeeld in Hive-stijl partitionering. Het bestandspad base_path/event=click/date=2021-04-01/f0.json resulteert bijvoorbeeld in de deductie van date en event als partitiekolommen. Als de onderliggende mapstructuur conflicterende Hive-partities bevat of geen Hive-stijlpartitionering bevat, worden partitiekolommen genegeerd.

Binaire bestanden (binaryFile) en text bestandsindelingen hebben vaste gegevensschema's, maar ondersteunen deductie van partitiekolommen. Databricks raadt de instelling cloudFiles.schemaLocation voor deze bestandsindelingen aan. Dit voorkomt mogelijke fouten of gegevensverlies en voorkomt deductie van partitiekolommen telkens wanneer een automatisch laadprogramma begint.

Partitiekolommen worden niet in aanmerking genomen voor de ontwikkeling van schema's. Als u een oorspronkelijke mapstructuur hebt zoals base_path/event=click/date=2021-04-01/f0.json, en vervolgens nieuwe bestanden ontvangt, base_path/event=click/date=2021-04-01/hour=01/f1.jsonnegeert automatisch laden de uurkolom. Als u informatie voor nieuwe partitiekolommen wilt vastleggen, stelt u deze in cloudFiles.partitionColumns op event,date,hour.

Notitie

De optie cloudFiles.partitionColumns maakt een door komma's gescheiden lijst met kolomnamen. Alleen kolommen die bestaan als key=value paren in uw mapstructuur, worden geparseerd.

Wat is de kolom met geredde gegevens?

Wanneer automatisch laden het schema affert, wordt er automatisch een opgeslagen gegevenskolom aan uw schema toegevoegd als _rescued_data. U kunt de naam van de kolom wijzigen of opnemen in gevallen waarin u een schema opgeeft door de optie rescuedDataColumnin te stellen.

De kolom met geredde gegevens zorgt ervoor dat kolommen die niet overeenkomen met het schema, worden gered in plaats van te worden verwijderd. De kolom met geredde gegevens bevat gegevens die om de volgende redenen niet worden geparseerd:

  • De kolom ontbreekt in het schema.
  • Type komt niet overeen.
  • Hoofdletters en kleine letters komen niet overeen.

De kolom met geredde gegevens bevat een JSON met de geredde kolommen en het pad naar het bronbestand van de record.

Notitie

De JSON- en CSV-parsers ondersteunen drie modi bij het parseren van records: PERMISSIVE, DROPMALFORMEDen FAILFAST. Wanneer gegevenstypen samen met rescuedDataColumnelkaar worden gebruikt, komen records niet overeen in DROPMALFORMED de modus of veroorzaken ze een fout in FAILFAST de modus. Alleen beschadigde records worden verwijderd of veroorzaken fouten, zoals onvolledige of onjuiste JSON of CSV. Als u badRecordsPath gebruikt bij het parseren van JSON of CSV, worden gegevenstypen niet beschouwd als ongeldige records bij gebruik van de rescuedDataColumn. Alleen onvolledige en onjuiste JSON- of CSV-records worden opgeslagen in badRecordsPath.

Hoofdlettergevoelig gedrag wijzigen

Tenzij hoofdlettergevoeligheid is ingeschakeld, worden de kolommen abcen AbcABC dezelfde kolom beschouwd voor schemadeductie. Het gekozen geval is willekeurig en is afhankelijk van de voorbeeldgegevens. U kunt schemahints gebruiken om af te dwingen welk geval moet worden gebruikt. Zodra een selectie is gemaakt en het schema is afgeleid, beschouwt Auto Loader niet de behuizingsvarianten die niet consistent zijn geselecteerd met het schema.

Wanneer de kolom met geredde gegevens is ingeschakeld, worden velden met de naam in een ander geval dan dat van het schema geladen in de _rescued_data kolom. Wijzig dit gedrag door de optie readerCaseSensitive in te stellen op onwaar, in welk geval Automatisch laadprogramma gegevens op een niet-hoofdlettergevoelige manier leest.

Schemadeductie overschrijven met schemahints

U kunt schemahints gebruiken om de schema-informatie af te dwingen die u kent en verwacht in een afgeleid schema. Wanneer u weet dat een kolom van een specifiek gegevenstype is of als u een meer algemeen gegevenstype wilt kiezen (bijvoorbeeld een double in plaats van een integer), kunt u een willekeurig aantal hints voor kolomgegevenstypen opgeven als een tekenreeks met de syntaxis van de SQL-schemaspecificatie, zoals het volgende:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Zie de documentatie over gegevenstypen voor de lijst met ondersteunde gegevenstypen.

Als een kolom niet aan het begin van de stroom aanwezig is, kunt u ook schemahints gebruiken om die kolom toe te voegen aan het uitgestelde schema.

Hier volgt een voorbeeld van een afgeleid schema om het gedrag te zien met schemahints.

Afgeleid schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Door de volgende schemahints op te geven:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

u krijgt:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Notitie

Ondersteuning voor matrix- en kaartschemahints is beschikbaar in Databricks Runtime 9.1 LTS en hoger.

Hier volgt een voorbeeld van een afgeleid schema met complexe gegevenstypen om het gedrag te zien met schemahints.

Afgeleid schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Door de volgende schemahints op te geven:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

u krijgt:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Notitie

Schemahints worden alleen gebruikt als u geen schema voor automatisch laden opgeeft. U kunt schemahints gebruiken, ongeacht cloudFiles.inferColumnTypes of deze is ingeschakeld of uitgeschakeld.