Schemadeductie en ontwikkeling configureren in automatisch laadprogramma
U kunt automatisch laden configureren om het schema van geladen gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd.
AutoLoader kan ook gegevens 'redden' die onverwacht waren (bijvoorbeeld van verschillende gegevenstypen) in een JSON-blobkolom, die u later kunt openen met behulp van de semi-gestructureerde API's voor gegevenstoegang.
De volgende indelingen worden ondersteund voor schemadeductie en evolutie:
File format | Ondersteunde versies |
---|---|
JSON |
Alle versies |
CSV |
Alle versies |
XML |
Databricks Runtime 14.3 LTS en hoger |
Avro |
Databricks Runtime 10.4 LTS en hoger |
Parquet |
Databricks Runtime 11.3 LTS en hoger |
ORC |
Niet ondersteund |
Text |
Niet van toepassing (vast schema) |
Binaryfile |
Niet van toepassing (vast schema) |
Syntaxis voor schemadeductie en evolutie
Als u een doelmap voor de optie cloudFiles.schemaLocation
opgeeft, kunt u schemadeductie en ontwikkeling inschakelen. U kunt ervoor kiezen om dezelfde map te gebruiken die u voor checkpointLocation
opgeeft. Als u Delta Live Tables gebruikt, beheert Azure Databricks automatisch schemalocatie en andere controlepuntgegevens.
Notitie
Als er meer dan één brongegevenslocatie in de doeltabel wordt geladen, is voor elke opnameworkload voor het automatisch laden een afzonderlijk streaming-controlepunt vereist.
In het volgende voorbeeld wordt parquet
voor de cloudFiles.format
. Gebruik csv
, avro
of json
voor andere bestandsbronnen. Alle andere instellingen voor lezen en schrijven blijven hetzelfde voor het standaardgedrag voor elke indeling.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Hoe werkt deductie van het automatisch laadprogramma?
Als u het schema wilt afleiden wanneer u gegevens voor het eerst leest, worden de eerste 50 GB of 1000 bestanden die worden gedetecteerd door autolaadprogramma's gesampt, afhankelijk van welke limiet het eerst wordt overschreden. Met automatisch laden worden de schemagegevens opgeslagen in een map _schemas
die is geconfigureerd cloudFiles.schemaLocation
om schemawijzigingen in de invoergegevens in de loop van de tijd bij te houden.
Notitie
Als u de grootte van het gebruikte voorbeeld wilt wijzigen, kunt u de SQL-configuraties instellen:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(bytetekenreeks, bijvoorbeeld 10gb
)
en
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(geheel getal)
Standaard wordt in deductie van het automatisch laadprogramma gezocht om problemen met de ontwikkeling van schema's te voorkomen als gevolg van niet-overeenkomende typen. Voor indelingen die geen gegevenstypen coderen (JSON, CSV en XML), worden alle kolommen als tekenreeksen afgeleid (inclusief geneste velden in JSON-bestanden). Voor indelingen met getypt schema (Parquet en Avro) wordt met Auto Loader een subset bestanden gesampleerd en worden de schema's van afzonderlijke bestanden samengevoegd. Dit gedrag wordt samengevat in de volgende tabel:
File format | Standaardgegevenstype voor uitgestelde gegevens |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Typen die zijn gecodeerd in avro-schema |
Parquet |
Typen die zijn gecodeerd in Parquet-schema |
De Apache Spark DataFrameReader gebruikt ander gedrag voor schemadeductie, het selecteren van gegevenstypen voor kolommen in JSON-, CSV- en XML-bronnen op basis van voorbeeldgegevens. Als u dit gedrag wilt inschakelen met automatisch laden, stelt u de optie cloudFiles.inferColumnTypes
in op true
.
Notitie
Bij het uitstellen van het schema voor CSV-gegevens wordt ervan uitgegaan dat de bestanden headers bevatten. Als uw CSV-bestanden geen headers bevatten, geeft u de optie .option("header", "false")
op. Daarnaast worden de schema's van alle bestanden in het voorbeeld samengevoegd met een globaal schema. Automatisch laden kan vervolgens elk bestand lezen op basis van de header en het CSV correct parseren.
Notitie
Wanneer een kolom verschillende gegevenstypen in twee Parquet-bestanden heeft, kiest Auto Loader het breedste type. U kunt schemaHints gebruiken om deze keuze te overschrijven. Wanneer u schemahints opgeeft, wordt de kolom niet door autolader naar het opgegeven type gecast, maar wordt de Parquet-lezer gevraagd de kolom als het opgegeven type te lezen. In het geval van een niet-overeenkomende kolom wordt de kolom in de kolom met geredde gegevens gered.
Hoe werkt de evolutie van het autoladerschema?
Auto Loader detecteert de toevoeging van nieuwe kolommen terwijl deze uw gegevens verwerkt. Wanneer automatisch laden een nieuwe kolom detecteert, stopt de stroom met een UnknownFieldException
. Voordat uw stream deze fout genereert, voert Auto Loader schemadeductie uit op de meest recente microbatch met gegevens en werkt de schemalocatie bij met het nieuwste schema door nieuwe kolommen samen te voegen aan het einde van het schema. De gegevenstypen van bestaande kolommen blijven ongewijzigd.
Databricks raadt u aan om Automatisch laden te configureren met Databricks-taken om automatisch opnieuw op te starten nadat dergelijke schemawijzigingen zijn gewijzigd.
Auto Loader ondersteunt de volgende modi voor schemaontwikkeling, die u in de optie cloudFiles.schemaEvolutionMode
instelt:
Modus | Gedrag bij het lezen van nieuwe kolom |
---|---|
addNewColumns (standaard) |
Stream mislukt. Er worden nieuwe kolommen aan het schema toegevoegd. Bestaande kolommen ontwikkelen geen gegevenstypen. |
rescue |
Het schema is nooit ontwikkeld en de stroom mislukt niet vanwege schemawijzigingen. Alle nieuwe kolommen worden vastgelegd in de kolom met geredde gegevens. |
failOnNewColumns |
Stream mislukt. Stream wordt niet opnieuw opgestart, tenzij het opgegeven schema wordt bijgewerkt of het offending-gegevensbestand wordt verwijderd. |
none |
Het schema wordt niet aangepast, nieuwe kolommen worden genegeerd en gegevens worden niet gered, tenzij de rescuedDataColumn optie is ingesteld. Stream mislukt niet vanwege schemawijzigingen. |
Hoe werken partities met automatische laadprogramma's?
Auto Loader probeert partitiekolommen af te stellen van de onderliggende mapstructuur van de gegevens als de gegevens zijn ingedeeld in Hive-stijl partitionering. Het bestandspad base_path/event=click/date=2021-04-01/f0.json
resulteert bijvoorbeeld in de deductie van date
en event
als partitiekolommen. Als de onderliggende mapstructuur conflicterende Hive-partities bevat of geen Hive-stijlpartitionering bevat, worden partitiekolommen genegeerd.
Binaire bestanden (binaryFile
) en text
bestandsindelingen hebben vaste gegevensschema's, maar ondersteunen deductie van partitiekolommen. Databricks raadt de instelling cloudFiles.schemaLocation
voor deze bestandsindelingen aan. Dit voorkomt mogelijke fouten of gegevensverlies en voorkomt deductie van partitiekolommen telkens wanneer een automatisch laadprogramma begint.
Partitiekolommen worden niet in aanmerking genomen voor de ontwikkeling van schema's. Als u een oorspronkelijke mapstructuur hebt zoals base_path/event=click/date=2021-04-01/f0.json
, en vervolgens nieuwe bestanden ontvangt, base_path/event=click/date=2021-04-01/hour=01/f1.json
negeert automatisch laden de uurkolom. Als u informatie voor nieuwe partitiekolommen wilt vastleggen, stelt u deze in cloudFiles.partitionColumns
op event,date,hour
.
Notitie
De optie cloudFiles.partitionColumns
maakt een door komma's gescheiden lijst met kolomnamen. Alleen kolommen die bestaan als key=value
paren in uw mapstructuur, worden geparseerd.
Wat is de kolom met geredde gegevens?
Wanneer automatisch laden het schema affert, wordt er automatisch een opgeslagen gegevenskolom aan uw schema toegevoegd als _rescued_data
. U kunt de naam van de kolom wijzigen of opnemen in gevallen waarin u een schema opgeeft door de optie rescuedDataColumn
in te stellen.
De kolom met geredde gegevens zorgt ervoor dat kolommen die niet overeenkomen met het schema, worden gered in plaats van te worden verwijderd. De kolom met geredde gegevens bevat gegevens die om de volgende redenen niet worden geparseerd:
- De kolom ontbreekt in het schema.
- Type komt niet overeen.
- Hoofdletters en kleine letters komen niet overeen.
De kolom met geredde gegevens bevat een JSON met de geredde kolommen en het pad naar het bronbestand van de record.
Notitie
De JSON- en CSV-parsers ondersteunen drie modi bij het parseren van records: PERMISSIVE
, DROPMALFORMED
en FAILFAST
. Wanneer gegevenstypen samen met rescuedDataColumn
elkaar worden gebruikt, komen records niet overeen in DROPMALFORMED
de modus of veroorzaken ze een fout in FAILFAST
de modus. Alleen beschadigde records worden verwijderd of veroorzaken fouten, zoals onvolledige of onjuiste JSON of CSV. Als u badRecordsPath
gebruikt bij het parseren van JSON of CSV, worden gegevenstypen niet beschouwd als ongeldige records bij gebruik van de rescuedDataColumn
. Alleen onvolledige en onjuiste JSON- of CSV-records worden opgeslagen in badRecordsPath
.
Hoofdlettergevoelig gedrag wijzigen
Tenzij hoofdlettergevoeligheid is ingeschakeld, worden de kolommen abc
en Abc
ABC
dezelfde kolom beschouwd voor schemadeductie. Het gekozen geval is willekeurig en is afhankelijk van de voorbeeldgegevens. U kunt schemahints gebruiken om af te dwingen welk geval moet worden gebruikt. Zodra een selectie is gemaakt en het schema is afgeleid, beschouwt Auto Loader niet de behuizingsvarianten die niet consistent zijn geselecteerd met het schema.
Wanneer de kolom met geredde gegevens is ingeschakeld, worden velden met de naam in een ander geval dan dat van het schema geladen in de _rescued_data
kolom. Wijzig dit gedrag door de optie readerCaseSensitive
in te stellen op onwaar, in welk geval Automatisch laadprogramma gegevens op een niet-hoofdlettergevoelige manier leest.
Schemadeductie overschrijven met schemahints
U kunt schemahints gebruiken om de schema-informatie af te dwingen die u kent en verwacht in een afgeleid schema. Wanneer u weet dat een kolom van een specifiek gegevenstype is of als u een meer algemeen gegevenstype wilt kiezen (bijvoorbeeld een double
in plaats van een integer
), kunt u een willekeurig aantal hints voor kolomgegevenstypen opgeven als een tekenreeks met de syntaxis van de SQL-schemaspecificatie, zoals het volgende:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Zie de documentatie over gegevenstypen voor de lijst met ondersteunde gegevenstypen.
Als een kolom niet aan het begin van de stroom aanwezig is, kunt u ook schemahints gebruiken om die kolom toe te voegen aan het uitgestelde schema.
Hier volgt een voorbeeld van een afgeleid schema om het gedrag te zien met schemahints.
Afgeleid schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Door de volgende schemahints op te geven:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
u krijgt:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Notitie
Ondersteuning voor matrix- en kaartschemahints is beschikbaar in Databricks Runtime 9.1 LTS en hoger.
Hier volgt een voorbeeld van een afgeleid schema met complexe gegevenstypen om het gedrag te zien met schemahints.
Afgeleid schema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Door de volgende schemahints op te geven:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
u krijgt:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Notitie
Schemahints worden alleen gebruikt als u geen schema voor automatisch laden opgeeft. U kunt schemahints gebruiken, ongeacht cloudFiles.inferColumnTypes
of deze is ingeschakeld of uitgeschakeld.