Schemadeductie en ontwikkeling configureren in automatisch laadprogramma
U kunt automatisch laden configureren om het schema van geladen gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd.
AutoLoader kan ook gegevens 'redden' die onverwacht waren (bijvoorbeeld van verschillende gegevenstypen) in een JSON-blobkolom, die u later kunt openen met behulp van de semi-gestructureerde API's voor gegevenstoegang.
De volgende indelingen worden ondersteund voor schemadeductie en evolutie:
Bestandsformaat | Ondersteunde versies |
---|---|
JSON |
Alle versies |
CSV |
Alle versies |
XML |
Databricks Runtime 14.3 LTS en hoger |
Avro |
Databricks Runtime 10.4 LTS en hoger |
Parquet |
Databricks Runtime 11.3 LTS en hoger |
ORC |
Niet ondersteund |
Text |
Niet van toepassing (vast schema) |
Binaryfile |
Niet van toepassing (vast schema) |
Syntaxis voor schemadeductie en evolutie
Als u een doelmap voor de optie cloudFiles.schemaLocation
opgeeft, kunt u schemadeductie en ontwikkeling inschakelen. U kunt ervoor kiezen om dezelfde map te gebruiken die u voor checkpointLocation
opgeeft. Als u DLT-gebruikt, beheert Azure Databricks de schemalocatie en andere controlepuntgegevens automatisch.
Notitie
Als er meer dan één brongegevenslocatie in de doeltabel wordt geladen, is voor elke opnameworkload voor het automatisch laden een afzonderlijk streaming-controlepunt vereist.
Het volgende voorbeeld gebruikt parquet
voor de cloudFiles.format
. Gebruik csv
, avro
of json
voor andere bestandsbronnen. Alle andere instellingen voor lezen en schrijven blijven hetzelfde voor het standaardgedrag voor elke indeling.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Hoe werkt de schema-inferentie in Auto Loader?
Als u het schema wilt afleiden wanneer u gegevens voor het eerst leest, worden de eerste 50 GB of 1000 bestanden die worden gedetecteerd door autolaadprogramma's gesampt, afhankelijk van welke limiet het eerst wordt overschreden. Auto Loader slaat de schemagegevens op in een map _schemas
op de geconfigureerde locatie cloudFiles.schemaLocation
om schemawijzigingen in de invoergegevens in de loop van de tijd bij te houden.
Notitie
Als u de grootte van het gebruikte voorbeeld wilt wijzigen, kunt u de SQL-configuraties instellen:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(bytetekenreeks, bijvoorbeeld 10gb
)
en
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(geheel getal)
Standaard probeert de Auto Loader-schema-inferentie schema-evolutieproblemen te voorkomen als gevolg van type-mismatches. Voor formaten die geen gegevens typen bevatten (JSON, CSV en XML), worden alle kolommen als strings beschouwd (inclusief ingebedde velden in JSON-bestanden). Voor indelingen met getypt schema (Parquet en Avro) wordt met Auto Loader een subset bestanden gesampleerd en worden de schema's van afzonderlijke bestanden samengevoegd. Dit gedrag wordt samengevat in de volgende tabel:
Bestandsformaat | Standaard afgeleide gegevenstype |
---|---|
JSON |
String |
CSV |
Snaar |
XML |
String |
Avro |
Typen die zijn gecodeerd in avro-schema |
Parquet |
Typen die zijn gecodeerd in Parquet-schema |
De Apache Spark DataFrameReader gebruikt ander gedrag voor schemadeductie, het selecteren van gegevenstypen voor kolommen in JSON-, CSV- en XML-bronnen op basis van voorbeeldgegevens. Als u dit gedrag wilt inschakelen met automatisch laden, stelt u de optie cloudFiles.inferColumnTypes
in op true
.
Notitie
Bij het afleiden van het schema voor CSV-gegevens gaat Auto Loader ervan uit dat de bestanden headers bevatten. Als uw CSV-bestanden geen headers bevatten, geeft u de optie .option("header", "false")
op. Daarnaast voegt Auto Loader de schema's van alle bestanden in de steekproef samen om een globaal schema te vormen. Auto Loader kan vervolgens elk bestand lezen op basis van de header en de CSV-bestanden correct parseren.
Notitie
Wanneer een kolom verschillende gegevenstypen in twee Parquet-bestanden heeft, kiest Auto Loader het breedste type. U kunt schemaHints gebruiken om deze keuze te overschrijven. Wanneer u schemahints opgeeft, wordt de kolom niet door autolader naar het opgegeven type gecast, maar wordt de Parquet-lezer gevraagd de kolom als het opgegeven type te lezen. In geval van een mismatch wordt de kolom gered in de kolom met geredde gegevens.
Hoe werkt schema-evolutie voor Auto Loader?
Auto Loader detecteert de toevoeging van nieuwe kolommen terwijl deze uw gegevens verwerkt. Wanneer Auto Loader een nieuwe kolom detecteert, wordt de stream gestopt met een UnknownFieldException
. Voordat uw stream deze fout genereert, voert Auto Loader schemadeductie uit op de meest recente microbatch met gegevens en werkt de schemalocatie bij met het nieuwste schema door nieuwe kolommen samen te voegen aan het einde van het schema. De gegevenstypen van bestaande kolommen blijven ongewijzigd.
Databricks raadt aan om Auto Loader-streams te configureren met Databricks Jobs zodat deze automatisch opnieuw opstarten na dergelijke schemawijzigingen.
Auto Loader ondersteunt de volgende modi voor schemaontwikkeling, die u in de optie cloudFiles.schemaEvolutionMode
instelt:
Modus | Gedrag bij het lezen van nieuwe kolom |
---|---|
addNewColumns (standaard) |
Stream mislukt. Er worden nieuwe kolommen aan het schema toegevoegd. Bestaande kolommen ontwikkelen geen gegevenstypen. |
rescue |
Het schema is nooit ontwikkeld en de stroom mislukt niet vanwege schemawijzigingen. Alle nieuwe kolommen worden vastgelegd in de kolom met geredde gegevens. |
failOnNewColumns |
Stream mislukt. Stream wordt niet opnieuw opgestart, tenzij het opgegeven schema wordt bijgewerkt of het offending-gegevensbestand wordt verwijderd. |
none |
Het schema wordt niet aangepast, nieuwe kolommen worden genegeerd en gegevens worden niet gered, tenzij de rescuedDataColumn optie is ingesteld. Stream mislukt niet vanwege schemawijzigingen. |
Notitie
addNewColumns
is de standaardmodus wanneer er geen schema wordt opgegeven, maar none
de standaardmodus is wanneer u een schema opgeeft.
addNewColumns
is niet toegestaan wanneer het schema van de stroom wordt opgegeven, maar het werkt wel als u uw schema opgeeft als een schemahint.
Hoe werken partities met automatische laadprogramma's?
Auto Loader probeert partitiekolommen af te leiden van de onderliggende directorystructuur van de data als de data is ingedeeld in partitionering in Hive-stijl. Het bestandspad base_path/event=click/date=2021-04-01/f0.json
resulteert bijvoorbeeld in de deductie van date
en event
als partitiekolommen. Als de onderliggende mapstructuur conflicterende Hive-partities bevat of geen Hive-stijlpartitionering bevat, worden partitiekolommen genegeerd.
Binaire bestanden (binaryFile
) en text
bestandsindelingen hebben vaste gegevensschema's, maar ondersteunen deductie van partitiekolommen. Databricks raadt de instelling cloudFiles.schemaLocation
voor deze bestandsindelingen aan. Dit voorkomt mogelijke fouten of gegevensverlies en voorkomt deductie van partitiekolommen telkens wanneer een automatisch laadprogramma begint.
Partitiekolommen worden niet in aanmerking genomen voor de ontwikkeling van schema's. Als u een oorspronkelijke mapstructuur hebt zoals base_path/event=click/date=2021-04-01/f0.json
en vervolgens nieuwe bestanden ontvangt als base_path/event=click/date=2021-04-01/hour=01/f1.json
, negeert automatisch laden de uurkolom. Als u informatie voor nieuwe partitiekolommen wilt vastleggen, stelt u cloudFiles.partitionColumns
in op event,date,hour
.
Notitie
De optie cloudFiles.partitionColumns
maakt een door komma's gescheiden lijst met kolomnamen. Alleen kolommen die bestaan als key=value
paren in uw mapstructuur, worden geparseerd.
Wat is de kolom met geredde gegevens?
Wanneer Auto Loader het schema afleidt, wordt er automatisch een herstelgegevenskolom aan uw schema toegevoegd als _rescued_data
. U kunt de naam van de kolom wijzigen of opnemen in gevallen waarin u een schema opgeeft door de optie rescuedDataColumn
in te stellen.
De kolom met geredde gegevens zorgt ervoor dat kolommen die niet overeenkomen met het schema, worden gered in plaats van te worden verwijderd. De kolom met geredde gegevens bevat gegevens die om de volgende redenen niet worden geparseerd:
- De kolom ontbreekt in het schema.
- Type komt niet overeen.
- Hoofdletters en kleine letters komen niet overeen.
De kolom met geredde gegevens bevat een JSON met de geredde kolommen en het pad naar het bronbestand van de record.
Notitie
De JSON- en CSV-parsers ondersteunen drie modi bij het parseren van records: PERMISSIVE
, DROPMALFORMED
en FAILFAST
. Wanneer rescuedDataColumn
samen met gegevenstypen wordt gebruikt, veroorzaakt dit niet dat records in de modus DROPMALFORMED
worden verwijderd en het veroorzaakt geen fout in de modus FAILFAST
. Alleen beschadigde records worden verwijderd of veroorzaken fouten, zoals onvolledige of onjuiste JSON of CSV. Als u badRecordsPath
gebruikt bij het parseren van JSON of CSV, worden verschillen in gegevenstypen niet beschouwd als slechte records bij gebruik van de rescuedDataColumn
. Alleen onvolledige en onjuiste JSON- of CSV-records worden opgeslagen in badRecordsPath
.
Hoofdlettergevoelig gedrag wijzigen
Tenzij hoofdlettergevoeligheid is ingeschakeld, worden de kolommen abc
en Abc
ABC
dezelfde kolom beschouwd voor schemadeductie. Het gekozen geval is willekeurig en is afhankelijk van de voorbeeldgegevens. U kunt schemahints gebruiken om af te dwingen welk geval moet worden gebruikt. Zodra een selectie is gemaakt en het schema is afgeleid, beschouwt Auto Loader niet de behuizingsvarianten die niet consistent zijn geselecteerd met het schema.
Wanneer de kolom met geredde gegevens is ingeschakeld, worden velden die met een andere letterkast dan die van het schema zijn genoemd, geladen in de _rescued_data
kolom. Wijzig dit gedrag door de optie readerCaseSensitive
op onwaar in te stellen, waardoor Auto Loader gegevens op een hoofdletterongevoelige manier leest.
Schemadeductie overschrijven met schemahints
U kunt schemahints gebruiken om de schema-informatie af te dwingen die u kent en verwacht in een afgeleid schema. Wanneer u weet dat een kolom van een specifiek gegevenstype is of als u een meer algemeen gegevenstype wilt kiezen (bijvoorbeeld een double
in plaats van een integer
), kunt u een willekeurig aantal hints voor kolomgegevenstypen opgeven als een tekenreeks met de syntaxis van de SQL-schemaspecificatie, zoals het volgende:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Zie de documentatie over gegevenstypen voor de lijst met ondersteunde gegevenstypen.
Als een kolom niet aan het begin van de gegevensstroom aanwezig is, kunt u ook schemahints gebruiken om die kolom toe te voegen aan het afgeleide schema.
Hier volgt een voorbeeld van een afgeleid schema om het gedrag te zien met schemahints.
Afgeleid schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Door de volgende schemahints op te geven:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
u krijgt:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Notitie
Ondersteuning voor matrix- en kaartschemahints is beschikbaar in Databricks Runtime 9.1 LTS en hoger.
Hier volgt een voorbeeld van een afgeleid schema met complexe gegevenstypen om het gedrag te zien met schemahints.
Afgeleid schema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Door de volgende schemahints op te geven:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
u krijgt:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Notitie
Schemahints worden alleen gebruikt als u geen schema voor automatisch laden opgeeft. U kunt schemahints gebruiken, ongeacht cloudFiles.inferColumnTypes
of deze is ingeschakeld of uitgeschakeld.