Udostępnij za pośrednictwem


Konfigurowanie wnioskowania i ewolucji schematu w module automatycznego ładowania

Automatyczne ładowanie można skonfigurować tak, aby automatycznie wykrywał schemat załadowanych danych, umożliwiając inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie.

Automatyczne ładowanie może również "ratować" dane, które były nieoczekiwane (na przykład różne typy danych) w kolumnie obiektu blob JSON, do których można później uzyskać dostęp przy użyciu interfejsów API dostępu do danych częściowo ustrukturyzowanych.

W przypadku wnioskowania i ewolucji schematu obsługiwane są następujące formaty:

File format Obsługiwane wersje
JSON Wszystkie wersje
CSV Wszystkie wersje
XML Databricks Runtime 14.3 LTS i nowsze
Avro Databricks Runtime 10.4 LTS i nowsze
Parquet Databricks Runtime 11.3 LTS i nowsze
ORC Nieobsługiwane
Text Nie dotyczy (schemat stały)
Binaryfile Nie dotyczy (schemat stały)

Składnia wnioskowania i ewolucji schematu

Określenie katalogu docelowego opcji cloudFiles.schemaLocation umożliwia wnioskowanie i ewolucję schematu. Możesz użyć tego samego katalogu, który został określony dla ustawienia checkpointLocation. Jeśli używasz tabel delta live, usługa Azure Databricks automatycznie zarządza lokalizacją schematu i innymi informacjami o punkcie kontrolnym.

Uwaga

Jeśli do tabeli docelowej jest ładowana więcej niż jedna lokalizacja danych źródłowych, każde obciążenie pozyskiwania automatycznego modułu ładującego wymaga oddzielnego punktu kontrolnego przesyłania strumieniowego.

W poniższym przykładzie użyto parquet elementu dla elementu cloudFiles.format. Użyj csv, avrolub json dla innych źródeł plików. Wszystkie inne ustawienia odczytu i zapisu pozostają takie same dla domyślnych zachowań dla każdego formatu.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jak działa wnioskowanie schematu modułu automatycznego ładowania?

Aby wywnioskować schemat podczas pierwszego odczytywania danych, moduł ładujący automatycznie próbkuje pierwsze 50 GB lub 1000 wykrytych plików, w zależności od tego, który limit zostanie przekroczony jako pierwszy. Automatycznie ładujący przechowuje informacje o schemacie w katalogu _schemas skonfigurowanym cloudFiles.schemaLocation do śledzenia zmian schematu w danych wejściowych w czasie.

Uwaga

Aby zmienić rozmiar użytej próbki, możesz ustawić konfiguracje SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(ciąg bajtowy, na przykład 10gb)

oraz

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(liczba całkowita)

Domyślnie wnioskowanie schematu modułu automatycznego ładowania ma na celu uniknięcie problemów z ewolucją schematu z powodu niezgodności typów. W przypadku formatów, które nie kodują typów danych (JSON, CSV i XML), moduł ładujący automatycznie wywnioskuje wszystkie kolumny jako ciągi (w tym pola zagnieżdżone w plikach JSON). W przypadku formatów ze schematem typowanym (Parquet i Avro) program Auto Loader próbkuje podzestaw plików i scala schematy poszczególnych plików. To zachowanie zostało podsumowane w poniższej tabeli:

File format Domyślny typ danych wywnioskowanych
JSON String
CSV String
XML String
Avro Typy zakodowane w schemacie Avro
Parquet Typy zakodowane w schemacie Parquet

Element DataFrameReader platformy Apache Spark używa innego zachowania w przypadku wnioskowania schematu, wybierania typów danych dla kolumn w źródłach JSON, CSV i XML na podstawie przykładowych danych. Aby włączyć to zachowanie za pomocą automatycznego modułu ładującego, ustaw opcję cloudFiles.inferColumnTypes na true.

Uwaga

Podczas wnioskowania schematu dla danych CSV program Auto Loader zakłada, że pliki zawierają nagłówki. Jeśli pliki CSV nie zawierają nagłówków, podaj opcję .option("header", "false"). Ponadto moduł automatycznego ładowania scala schematy wszystkich plików w przykładzie w celu utworzenia schematu globalnego. Funkcja automatycznego modułu ładującego może następnie odczytywać każdy plik zgodnie z nagłówkiem i poprawnie analizować wolumin CSV.

Uwaga

Jeśli kolumna ma różne typy danych w dwóch plikach Parquet, moduł ładujący automatycznie wybiera najszerszy typ. Możesz użyć schemaHints , aby zastąpić ten wybór. Po określeniu wskazówek schematu moduł ładujący automatycznie nie rzutuje kolumny do określonego typu, ale raczej informuje czytelnika Parquet o odczytaniu kolumny jako określonego typu. W przypadku niezgodności kolumna jest ratowana w uratowanej kolumnie danych.

Jak działa ewolucja schematu modułu automatycznego ładowania?

Moduł automatycznego ładowania wykrywa dodanie nowych kolumn podczas przetwarzania danych. Gdy funkcja automatycznego ładowania wykryje nową kolumnę, strumień zatrzymuje się przy użyciu elementu UnknownFieldException. Przed zgłoszeniem tego błędu strumień automatycznie ładujący wykonuje wnioskowanie schematu na najnowszej mikrosadowej partii danych i aktualizuje lokalizację schematu przy użyciu najnowszego schematu, scalając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione.

Usługa Databricks zaleca skonfigurowanie strumieni automatycznego modułu ładującego za pomocą zadań usługi Databricks w celu automatycznego ponownego uruchomienia po wprowadzeniu takich zmian schematu.

Moduł automatycznego ładowania obsługuje następujące tryby ewolucji schematu, które można ustawić w opcji cloudFiles.schemaEvolutionMode:

Tryb Zachowanie podczas odczytywania nowej kolumny
addNewColumns (domyślne) Przesyłanie strumieniowe kończy się niepowodzeniem. Nowe kolumny są dodawane do schematu. Istniejące kolumny nie ewoluują typów danych.
rescue Schemat nigdy nie ewoluował, a strumień nie kończy się niepowodzeniem z powodu zmian schematu. Wszystkie nowe kolumny są rejestrowane w uratowanej kolumnie danych.
failOnNewColumns Przesyłanie strumieniowe kończy się niepowodzeniem. Usługa Stream nie zostanie ponownie uruchomiona, chyba że podany schemat zostanie zaktualizowany lub plik danych o przestępstwach zostanie usunięty.
none Nie rozwija schematu, nowe kolumny są ignorowane, a dane nie są ratowane, chyba że opcja jest ustawiona rescuedDataColumn . Usługa Stream nie kończy się niepowodzeniem z powodu zmian schematu.

Uwaga

addNewColumns tryb jest domyślny, gdy nie podano schematu, ale none jest ustawieniem domyślnym podczas podawania schematu. addNewColumns nie jest dopuszczalne, gdy podany jest schemat strumienia, ale działa, jeśli podajesz swój schemat jako wskazówkę w postaci schematu .

Jak działają partycje z modułem automatycznego ładowania?

Moduł automatycznego ładowania próbuje wywnioskować kolumny partycji z bazowej struktury katalogów danych, jeśli dane są określone w partycjonowaniu w stylu hive. Na przykład ścieżka base_path/event=click/date=2021-04-01/f0.json pliku powoduje wnioskowanie date kolumn i event jako kolumny partycji. Jeśli podstawowa struktura katalogów zawiera sprzeczne partycje programu Hive lub nie zawiera partycjonowania stylu hive, kolumny partycji są ignorowane.

Formaty plików binarnych (binaryFile) i text plików mają stałe schematy danych, ale obsługują wnioskowanie kolumn partycji. Usługa Databricks zaleca ustawienie cloudFiles.schemaLocation tych formatów plików. Zapobiega to wszelkim potencjalnym błędom lub utracie informacji i zapobiega wnioskowaniu kolumn partycji za każdym razem, gdy rozpocznie się automatyczne ładowanie.

Kolumny partycji nie są brane pod uwagę podczas ewolucji schematu. Jeśli masz początkową strukturę katalogów, na przykład base_path/event=click/date=2021-04-01/f0.json, a następnie zacznij otrzymywać nowe pliki jako base_path/event=click/date=2021-04-01/hour=01/f1.json, automatycznie ładujący ignoruje kolumnę godziny. Aby przechwycić informacje dotyczące nowych kolumn partycji, ustaw wartość cloudFiles.partitionColumnsevent,date,hour.

Uwaga

Opcja cloudFiles.partitionColumns przyjmuje rozdzielaną przecinkami listę nazw kolumn. Analizowane są tylko kolumny, które istnieją jako key=value pary w strukturze katalogu.

Jaka jest uratowana kolumna danych?

Gdy moduł automatycznego ładowania wywnioskuje schemat, uratowana kolumna danych zostanie automatycznie dodana do schematu jako _rescued_data. Możesz zmienić nazwę kolumny lub dołączyć ją w przypadkach, w których podajesz schemat, ustawiając opcję rescuedDataColumn.

Uratowana kolumna danych gwarantuje, że kolumny, które nie pasują do schematu, zostaną uratowane zamiast porzuconych. Uratowana kolumna danych zawiera wszystkie dane, które nie są analizowane z następujących powodów:

  • Brak kolumny ze schematu.
  • Niezgodność typów.
  • Niezgodność wielkości liter.

Uratowana kolumna danych zawiera kod JSON zawierający uratowane kolumny i ścieżkę pliku źródłowego rekordu.

Uwaga

Analizatory JSON i CSV obsługują trzy tryby podczas analizowania rekordów: PERMISSIVE, DROPMALFORMEDi FAILFAST. W przypadku użycia razem z elementem rescuedDataColumnniezgodność typów danych nie powoduje porzucenia rekordów w DROPMALFORMED trybie lub zgłaszania błędu w FAILFAST trybie. Tylko uszkodzone rekordy są porzucane lub zgłaszane błędy, takie jak niekompletne lub źle sformułowane pliki JSON lub CSV. Jeśli używasz badRecordsPath podczas analizowania formatu JSON lub CSV, niezgodności typów danych nie są traktowane jako nieprawidłowe rekordy podczas korzystania z elementu rescuedDataColumn. W pliku badRecordsPathsą przechowywane tylko niekompletne i źle sformułowane rekordy JSON lub CSV.

Zmiana zachowania uwzględniającego wielkość liter

Chyba że jest włączona czułość wielkości liter, kolumny abc, Abci ABC są traktowane jako ta sama kolumna na potrzeby wnioskowania schematu. Wybrany przypadek jest dowolny i zależy od przykładowych danych. Możesz użyć wskazówek schematu , aby wymusić użycie tego przypadku. Po zaznaczeniu i wywnioskowaniu schematu moduł automatycznego ładowania nie uwzględnia wariantów wielkości liter, które nie zostały wybrane zgodnie ze schematem.

Po włączeniu uratowanej kolumny danych pola nazwane w przypadku innym niż schemat są ładowane do kolumny _rescued_data . Zmień to zachowanie, ustawiając opcję readerCaseSensitive na false, w takim przypadku moduł automatycznego ładowania odczytuje dane w sposób bez uwzględniania wielkości liter.

Zastępowanie wnioskowania schematu za pomocą wskazówek schematu

Możesz użyć wskazówek schematu, aby wymusić informacje o schemacie, które znasz i których oczekujesz na wywnioskowanym schemacie. Jeśli wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład double zamiast integer), możesz podać dowolną liczbę wskazówek dla typów danych kolumn jako ciąg przy użyciu składni specyfikacji schematu SQL, takiej jak następujące:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Zapoznaj się z dokumentacją dotyczącą typów danych, aby uzyskać listę obsługiwanych typów danych.

Jeśli kolumna nie jest obecna na początku strumienia, możesz również użyć wskazówek schematu, aby dodać kolumnę do wnioskowanego schematu.

Oto przykład wnioskowanego schematu, aby zobaczyć zachowanie ze wskazówkami schematu.

Wywnioskowany schemat:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Określając następujące wskazówki schematu:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

otrzymasz:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Uwaga

Obsługa wskazówek schematów tablic i map jest dostępna w środowisku Databricks Runtime 9.1 LTS i nowszym.

Oto przykład wnioskowanego schematu ze złożonymi typami danych, aby zobaczyć zachowanie ze wskazówkami schematu.

Wywnioskowany schemat:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Określając następujące wskazówki schematu:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

otrzymasz:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Uwaga

Wskazówki schematu są używane tylko wtedy, gdy nie podasz schematu modułowi ładującego automatycznie. Możesz użyć wskazówek schematu, czy cloudFiles.inferColumnTypes jest włączona, czy wyłączona.