Konfigurowanie wnioskowania i ewolucji schematu w module automatycznego ładowania
Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie.
Automatyczne ładowanie może również "ratować" dane, które były nieoczekiwane (na przykład różne typy danych) w kolumnie obiektu blob JSON, do których można później uzyskać dostęp przy użyciu interfejsów API dostępu do danych częściowo ustrukturyzowanych.
W przypadku wnioskowania i ewolucji schematu obsługiwane są następujące formaty:
File format | Obsługiwane wersje |
---|---|
JSON |
Wszystkie wersje |
CSV |
Wszystkie wersje |
XML |
Databricks Runtime 14.3 LTS i nowsze |
Avro |
Databricks Runtime 10.4 LTS i nowsze |
Parquet |
Databricks Runtime 11.3 LTS i nowsze |
ORC |
Nieobsługiwane |
Text |
Nie dotyczy (schemat stały) |
Binaryfile |
Nie dotyczy (schemat stały) |
Składnia wnioskowania i ewolucji schematu
Określenie katalogu docelowego opcji cloudFiles.schemaLocation
umożliwia wnioskowanie i ewolucję schematu. Możesz użyć tego samego katalogu, który został określony dla ustawienia checkpointLocation
. Jeśli używasz tabel delta live, usługa Azure Databricks automatycznie zarządza lokalizacją schematu i innymi informacjami o punkcie kontrolnym.
Uwaga
Jeśli do tabeli docelowej jest ładowana więcej niż jedna lokalizacja danych źródłowych, każde obciążenie pozyskiwania automatycznego modułu ładującego wymaga oddzielnego punktu kontrolnego przesyłania strumieniowego.
W poniższym przykładzie użyto parquet
elementu dla elementu cloudFiles.format
. Użyj csv
, avro
lub json
dla innych źródeł plików. Wszystkie inne ustawienia odczytu i zapisu pozostają takie same dla domyślnych zachowań dla każdego formatu.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Jak działa wnioskowanie schematu modułu automatycznego ładowania?
Aby wywnioskować schemat podczas pierwszego odczytywania danych, moduł ładujący automatycznie próbkuje pierwsze 50 GB lub 1000 wykrytych plików, w zależności od tego, który limit zostanie przekroczony jako pierwszy. Automatycznie ładujący przechowuje informacje o schemacie w katalogu _schemas
skonfigurowanym cloudFiles.schemaLocation
do śledzenia zmian schematu w danych wejściowych w czasie.
Uwaga
Aby zmienić rozmiar użytej próbki, możesz ustawić konfiguracje SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(ciąg bajtowy, na przykład 10gb
)
oraz
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(liczba całkowita)
Domyślnie wnioskowanie schematu modułu automatycznego ładowania ma na celu uniknięcie problemów z ewolucją schematu z powodu niezgodności typów. W przypadku formatów, które nie kodują typów danych (JSON, CSV i XML), moduł ładujący automatycznie wywnioskuje wszystkie kolumny jako ciągi (w tym pola zagnieżdżone w plikach JSON). W przypadku formatów ze schematem typowanym (Parquet i Avro) program Auto Loader próbkuje podzestaw plików i scala schematy poszczególnych plików. To zachowanie zostało podsumowane w poniższej tabeli:
File format | Domyślny typ danych wywnioskowanych |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Typy zakodowane w schemacie Avro |
Parquet |
Typy zakodowane w schemacie Parquet |
Element DataFrameReader platformy Apache Spark używa innego zachowania w przypadku wnioskowania schematu, wybierania typów danych dla kolumn w źródłach JSON, CSV i XML na podstawie przykładowych danych. Aby włączyć to zachowanie za pomocą automatycznego modułu ładującego, ustaw opcję cloudFiles.inferColumnTypes
na true
.
Uwaga
Podczas wnioskowania schematu dla danych CSV program Auto Loader zakłada, że pliki zawierają nagłówki. Jeśli pliki CSV nie zawierają nagłówków, podaj opcję .option("header", "false")
. Ponadto moduł automatycznego ładowania scala schematy wszystkich plików w przykładzie w celu utworzenia schematu globalnego. Funkcja automatycznego modułu ładującego może następnie odczytywać każdy plik zgodnie z nagłówkiem i poprawnie analizować wolumin CSV.
Uwaga
Jeśli kolumna ma różne typy danych w dwóch plikach Parquet, moduł ładujący automatycznie wybiera najszerszy typ. Możesz użyć schemaHints , aby zastąpić ten wybór. Po określeniu wskazówek schematu moduł ładujący automatycznie nie rzutuje kolumny do określonego typu, ale raczej informuje czytelnika Parquet o odczytaniu kolumny jako określonego typu. W przypadku niezgodności kolumna jest ratowana w uratowanej kolumnie danych.
Jak działa ewolucja schematu modułu automatycznego ładowania?
Moduł automatycznego ładowania wykrywa dodanie nowych kolumn podczas przetwarzania danych. Gdy funkcja automatycznego ładowania wykryje nową kolumnę, strumień zatrzymuje się przy użyciu elementu UnknownFieldException
. Przed zgłoszeniem tego błędu strumień automatycznie ładujący wykonuje wnioskowanie schematu na najnowszej mikrosadowej partii danych i aktualizuje lokalizację schematu przy użyciu najnowszego schematu, scalając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione.
Usługa Databricks zaleca skonfigurowanie strumieni automatycznego modułu ładującego za pomocą zadań usługi Databricks w celu automatycznego ponownego uruchomienia po wprowadzeniu takich zmian schematu.
Moduł automatycznego ładowania obsługuje następujące tryby ewolucji schematu, które można ustawić w opcji cloudFiles.schemaEvolutionMode
:
Tryb | Zachowanie podczas odczytywania nowej kolumny |
---|---|
addNewColumns (domyślne) |
Przesyłanie strumieniowe kończy się niepowodzeniem. Nowe kolumny są dodawane do schematu. Istniejące kolumny nie ewoluują typów danych. |
rescue |
Schemat nigdy nie ewoluował, a strumień nie kończy się niepowodzeniem z powodu zmian schematu. Wszystkie nowe kolumny są rejestrowane w uratowanej kolumnie danych. |
failOnNewColumns |
Przesyłanie strumieniowe kończy się niepowodzeniem. Usługa Stream nie zostanie ponownie uruchomiona, chyba że podany schemat zostanie zaktualizowany lub plik danych o przestępstwach zostanie usunięty. |
none |
Nie rozwija schematu, nowe kolumny są ignorowane, a dane nie są ratowane, chyba że opcja jest ustawiona rescuedDataColumn . Usługa Stream nie kończy się niepowodzeniem z powodu zmian schematu. |
Uwaga
addNewColumns
tryb jest domyślny, gdy nie podano schematu, ale none
jest ustawieniem domyślnym podczas podawania schematu.
addNewColumns
nie jest dopuszczalne, gdy podany jest schemat strumienia, ale działa, jeśli podajesz swój schemat jako wskazówkę w postaci schematu .
Jak działają partycje z modułem automatycznego ładowania?
Moduł automatycznego ładowania próbuje wywnioskować kolumny partycji z bazowej struktury katalogów danych, jeśli dane są określone w partycjonowaniu w stylu hive. Na przykład ścieżka base_path/event=click/date=2021-04-01/f0.json
pliku powoduje wnioskowanie date
kolumn i event
jako kolumny partycji. Jeśli podstawowa struktura katalogów zawiera sprzeczne partycje programu Hive lub nie zawiera partycjonowania stylu hive, kolumny partycji są ignorowane.
Formaty plików binarnych (binaryFile
) i text
plików mają stałe schematy danych, ale obsługują wnioskowanie kolumn partycji. Usługa Databricks zaleca ustawienie cloudFiles.schemaLocation
tych formatów plików. Zapobiega to wszelkim potencjalnym błędom lub utracie informacji i zapobiega wnioskowaniu kolumn partycji za każdym razem, gdy rozpocznie się automatyczne ładowanie.
Kolumny partycji nie są brane pod uwagę podczas ewolucji schematu. Jeśli masz początkową strukturę katalogów, na przykład base_path/event=click/date=2021-04-01/f0.json
, a następnie zacznij otrzymywać nowe pliki jako base_path/event=click/date=2021-04-01/hour=01/f1.json
, automatycznie ładujący ignoruje kolumnę godziny. Aby przechwycić informacje dotyczące nowych kolumn partycji, ustaw wartość cloudFiles.partitionColumns
event,date,hour
.
Uwaga
Opcja cloudFiles.partitionColumns
przyjmuje rozdzielaną przecinkami listę nazw kolumn. Analizowane są tylko kolumny, które istnieją jako key=value
pary w strukturze katalogu.
Jaka jest uratowana kolumna danych?
Gdy moduł automatycznego ładowania wywnioskuje schemat, uratowana kolumna danych zostanie automatycznie dodana do schematu jako _rescued_data
. Możesz zmienić nazwę kolumny lub dołączyć ją w przypadkach, w których podajesz schemat, ustawiając opcję rescuedDataColumn
.
Uratowana kolumna danych gwarantuje, że kolumny, które nie pasują do schematu, zostaną uratowane zamiast porzuconych. Uratowana kolumna danych zawiera wszystkie dane, które nie są analizowane z następujących powodów:
- Brak kolumny ze schematu.
- Niezgodność typów.
- Niezgodność wielkości liter.
Uratowana kolumna danych zawiera kod JSON zawierający uratowane kolumny i ścieżkę pliku źródłowego rekordu.
Uwaga
Analizatory JSON i CSV obsługują trzy tryby podczas analizowania rekordów: PERMISSIVE
, DROPMALFORMED
i FAILFAST
. W przypadku użycia razem z elementem rescuedDataColumn
niezgodność typów danych nie powoduje porzucenia rekordów w DROPMALFORMED
trybie lub zgłaszania błędu w FAILFAST
trybie. Tylko uszkodzone rekordy są porzucane lub zgłaszane błędy, takie jak niekompletne lub źle sformułowane pliki JSON lub CSV. Jeśli używasz badRecordsPath
podczas analizowania formatu JSON lub CSV, niezgodności typów danych nie są traktowane jako nieprawidłowe rekordy podczas korzystania z elementu rescuedDataColumn
. W pliku badRecordsPath
są przechowywane tylko niekompletne i źle sformułowane rekordy JSON lub CSV.
Zmiana zachowania uwzględniającego wielkość liter
Chyba że jest włączona czułość wielkości liter, kolumny abc
, Abc
i ABC
są traktowane jako ta sama kolumna na potrzeby wnioskowania schematu. Wybrany przypadek jest dowolny i zależy od przykładowych danych. Możesz użyć wskazówek schematu , aby wymusić użycie tego przypadku. Po zaznaczeniu i wywnioskowaniu schematu moduł automatycznego ładowania nie uwzględnia wariantów wielkości liter, które nie zostały wybrane zgodnie ze schematem.
Po włączeniu uratowanej kolumny danych pola nazwane w przypadku innym niż schemat są ładowane do kolumny _rescued_data
. Zmień to zachowanie, ustawiając opcję readerCaseSensitive
na false, w takim przypadku moduł automatycznego ładowania odczytuje dane w sposób bez uwzględniania wielkości liter.
Zastępowanie wnioskowania schematu za pomocą wskazówek schematu
Możesz użyć wskazówek schematu, aby wymusić informacje o schemacie, które znasz i których oczekujesz na wywnioskowanym schemacie. Jeśli wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład double
zamiast integer
), możesz podać dowolną liczbę wskazówek dla typów danych kolumn jako ciąg przy użyciu składni specyfikacji schematu SQL, takiej jak następujące:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Zapoznaj się z dokumentacją dotyczącą typów danych, aby uzyskać listę obsługiwanych typów danych.
Jeśli kolumna nie jest obecna na początku strumienia, możesz również użyć wskazówek schematu, aby dodać kolumnę do wnioskowanego schematu.
Oto przykład wnioskowanego schematu, aby zobaczyć zachowanie ze wskazówkami schematu.
Wywnioskowany schemat:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Określając następujące wskazówki schematu:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
otrzymasz:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Uwaga
Obsługa wskazówek schematów tablic i map jest dostępna w środowisku Databricks Runtime 9.1 LTS i nowszym.
Oto przykład wnioskowanego schematu ze złożonymi typami danych, aby zobaczyć zachowanie ze wskazówkami schematu.
Wywnioskowany schemat:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Określając następujące wskazówki schematu:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
otrzymasz:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Uwaga
Wskazówki schematu są używane tylko wtedy, gdy nie podasz schematu modułowi ładującego automatycznie. Możesz użyć wskazówek schematu, czy cloudFiles.inferColumnTypes
jest włączona, czy wyłączona.