Aktualizowanie schematu tabeli usługi Delta Lake
Usługa Delta Lake umożliwia aktualizowanie schematu tabeli. Obsługiwane są następujące typy zmian:
- Dodawanie nowych kolumn (w dowolnych pozycjach)
- Zmiana kolejności istniejących kolumn
- Zmienianie nazw istniejących kolumn
Te zmiany można jawnie wprowadzić przy użyciu języka DDL lub niejawnie przy użyciu języka DML.
Ważne
Aktualizacja schematu tabeli delty to operacja, która powoduje konflikt ze wszystkimi współbieżnymi operacjami zapisu różnicowego.
Po zaktualizowaniu schematu tabeli Delta strumienie odczytane z tej tabeli zakończą się. Jeśli chcesz, aby strumień kontynuował, musisz uruchomić go ponownie. Aby zapoznać się z zalecanymi metodami, zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.
Jawna aktualizacja schematu w celu dodawania kolumn
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Domyślnie wartość null ma wartość true
.
Aby dodać kolumnę do zagnieżdżonego pola, użyj:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Jeśli na przykład schemat przed uruchomieniem ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
to:
- root
| - colA
| - colB
| +-field1
| +-field2
schemat po:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Uwaga
Dodawanie zagnieżdżonych kolumn jest obsługiwane tylko dla struktur. Tablice i mapy nie są obsługiwane.
Jawna aktualizacja schematu w celu zmiany komentarza do kolumny lub kolejności
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Aby zmienić kolumnę w zagnieżdżonym polu, użyj:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Jeśli na przykład schemat przed uruchomieniem ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
to:
- root
| - colA
| - colB
| +-field1
| +-field2
schemat po:
- root
| - colA
| - colB
| +-field2
| +-field1
Jawna aktualizacja schematu w celu zastąpienia kolumn
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Na przykład podczas uruchamiania następującego kodu DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
jeśli schemat przed jest:
- root
| - colA
| - colB
| +-field1
| +-field2
schemat po:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Jawne aktualizowanie schematu w celu zmiany nazwy kolumn
Uwaga
Ta funkcja jest dostępna w środowisku Databricks Runtime 10.4 LTS lub nowszym.
Aby zmienić nazwy kolumn bez ponownego zapisywania istniejących danych kolumn, należy włączyć mapowanie kolumn dla tabeli. Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.
Aby zmienić nazwę kolumny:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Aby zmienić nazwę zagnieżdżonego pola:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Na przykład po uruchomieniu następującego polecenia:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Jeśli schemat przed jest:
- root
| - colA
| - colB
| +-field1
| +-field2
Następnie schemat po:
- root
| - colA
| - colB
| +-field001
| +-field2
Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.
Jawne aktualizowanie schematu w celu porzucania kolumn
Uwaga
Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.
Aby usunąć kolumny jako operację tylko dla metadanych bez ponownego zapisywania plików danych, należy włączyć mapowanie kolumn dla tabeli. Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.
Ważne
Usunięcie kolumny z metadanych nie powoduje usunięcia danych bazowych dla kolumny w plikach. Aby przeczyścić usunięte dane kolumn, możesz użyć tabeli REORG do ponownego zapisywania plików. Następnie możesz użyć funkcji VACUUM , aby fizycznie usunąć pliki zawierające porzucone dane kolumny.
Aby usunąć kolumnę:
ALTER TABLE table_name DROP COLUMN col_name
Aby usunąć wiele kolumn:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Jawne aktualizowanie schematu w celu zmiany typu kolumny lub nazwy
Możesz zmienić typ lub nazwę kolumny lub usunąć kolumnę, zapisując ponownie tabelę. W tym celu użyj overwriteSchema
opcji .
W poniższym przykładzie pokazano zmianę typu kolumny:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
W poniższym przykładzie pokazano zmianę nazwy kolumny:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Włączanie ewolucji schematu
Możesz włączyć ewolucję schematu, wykonując jedną z następujących czynności:
.option("mergeSchema", "true")
Ustaw element na ramkęwrite
danych platformy Spark lubwriteStream
operację. Zobacz Włączanie ewolucji schematu dla zapisów w celu dodania nowych kolumn.- Użyj
MERGE WITH SCHEMA EVOLUTION
składni. Zobacz Składnia ewolucji schematu, aby scalić. - Ustaw wartość conf
spark.databricks.delta.schema.autoMerge.enabled
true
platformy Spark na bieżącą platformę SparkSession.
Usługa Databricks zaleca włączenie ewolucji schematu dla każdej operacji zapisu zamiast ustawiania conf platformy Spark.
Jeśli używasz opcji lub składni w celu włączenia ewolucji schematu w operacji zapisu, ma to pierwszeństwo przed ograniczeniem platformy Spark.
Uwaga
Nie ma klauzuli ewolucji schematu dla INSERT INTO
instrukcji.
Włączanie ewolucji schematu dla zapisów w celu dodania nowych kolumn
Kolumny, które znajdują się w zapytaniu źródłowym, ale brakuje w tabeli docelowej, są automatycznie dodawane w ramach transakcji zapisu po włączeniu ewolucji schematu. Zobacz Włączanie ewolucji schematu.
Wielkość liter jest zachowywana podczas dołączania nowej kolumny. Nowe kolumny są dodawane na końcu schematu tabeli. Jeśli dodatkowe kolumny znajdują się w strukturę, są dołączane na końcu struktury w tabeli docelowej.
W poniższym przykładzie pokazano użycie opcji z funkcją mergeSchema
automatycznego ładowania. Zobacz Co to jest moduł automatycznego ładowania?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
W poniższym przykładzie pokazano użycie mergeSchema
opcji z operacją zapisu wsadowego:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Automatyczna ewolucja schematu dla scalania usługi Delta Lake
Ewolucja schematu umożliwia użytkownikom rozwiązywanie niezgodności schematów między tabelą docelową i źródłową w scalaniu. Obsługuje on następujące dwa przypadki:
- Kolumna w tabeli źródłowej nie jest obecna w tabeli docelowej. Nowa kolumna jest dodawana do schematu docelowego, a jego wartości są wstawiane lub aktualizowane przy użyciu wartości źródłowych.
- Kolumna w tabeli docelowej nie jest obecna w tabeli źródłowej. Schemat docelowy pozostaje niezmieniony; wartości w dodatkowej kolumnie docelowej są pozostawione bez zmian (dla
UPDATE
) lub ustawione naNULL
wartość (dlaINSERT
).
Należy ręcznie włączyć automatyczną ewolucję schematu. Zobacz Włączanie ewolucji schematu.
Uwaga
W środowisku Databricks Runtime 12.2 LTS lub nowszym kolumny i pola struktury obecne w tabeli źródłowej mogą być określane przez nazwę w akcjach wstawiania lub aktualizowania. W środowisku Databricks Runtime 11.3 LTS i poniżej można używać tylko INSERT *
akcji lub UPDATE SET *
akcji do ewolucji schematu z scalania.
W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć ewolucji schematu z strukturami zagnieżdżonym wewnątrz map, takimi jak map<int, struct<a: int, b: int>>
.
Składnia ewolucji schematu dla scalania
W środowisku Databricks Runtime 15.2 lub nowszym można określić ewolucję schematu w instrukcji scalania przy użyciu interfejsów API tabel SQL lub delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Przykładowe operacje scalania z ewolucją schematu
Oto kilka przykładów efektów merge
operacji z ewolucją schematu i bez nich.
Kolumny | Zapytanie (w języku SQL) | Zachowanie bez ewolucji schematu (ustawienie domyślne) | Zachowanie z ewolucją schematu |
---|---|---|---|
Kolumny docelowe: key, value Kolumny źródłowe: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Schemat tabeli pozostaje niezmieniony; tylko kolumny key , value są aktualizowane/wstawiane. |
Schemat tabeli został zmieniony na (key, value, new_value) . Istniejące rekordy z dopasowaniami są aktualizowane za pomocą elementu value i new_value w źródle. Nowe wiersze są wstawiane ze schematem (key, value, new_value) . |
Kolumny docelowe: key, old_value Kolumny źródłowe: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE i INSERT akcje zgłaszają błąd, ponieważ kolumna old_value docelowa nie znajduje się w źródle. |
Schemat tabeli został zmieniony na (key, old_value, new_value) . Istniejące rekordy z dopasowaniami są aktualizowane za pomocą new_value elementu w źródle, pozostawiając old_value niezmienione. Nowe rekordy są wstawiane z określonymi key elementami , new_value i NULL dla elementu old_value . |
Kolumny docelowe: key, old_value Kolumny źródłowe: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE zgłasza błąd, ponieważ kolumna new_value nie istnieje w tabeli docelowej. |
Schemat tabeli został zmieniony na (key, old_value, new_value) . Istniejące rekordy z dopasowaniami są aktualizowane za pomocą new_value elementu w źródle, pozostawiając old_value niezmienione, a niedopasowane rekordy zostały NULL wprowadzone dla elementu new_value . Zobacz notatkę (1). |
Kolumny docelowe: key, old_value Kolumny źródłowe: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT zgłasza błąd, ponieważ kolumna new_value nie istnieje w tabeli docelowej. |
Schemat tabeli został zmieniony na (key, old_value, new_value) . Nowe rekordy są wstawiane z określonymi key elementami , new_value i NULL dla elementu old_value . Istniejące rekordy zostały NULL wprowadzone w celu new_value pozostawienia old_value niezmienionego. Zobacz notatkę (1). |
(1) To zachowanie jest dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym; Databricks Runtime 11.3 LTS i poniżej błędu w tym warunku.
Wykluczanie kolumn z scalania usługi Delta Lake
W środowisku Databricks Runtime 12.2 LTS i nowszym można użyć EXCEPT
klauzul w warunkach scalania, aby jawnie wykluczyć kolumny. Zachowanie słowa kluczowego EXCEPT
różni się w zależności od tego, czy jest włączona ewolucja schematu.
Po wyłączeniu EXCEPT
ewolucji schematu słowo kluczowe dotyczy listy kolumn w tabeli docelowej i zezwala na wykluczenie kolumn z UPDATE
lub INSERT
akcji. Wykluczone kolumny są ustawione na null
wartość .
Po włączeniu EXCEPT
ewolucji schematu słowo kluczowe ma zastosowanie do listy kolumn w tabeli źródłowej i umożliwia wykluczenie kolumn z ewolucji schematu. Nowa kolumna w źródle, który nie znajduje się w obiekcie docelowym, nie jest dodawana do schematu docelowego, jeśli jest wymieniona w klauzuli EXCEPT
. Wykluczone kolumny, które są już obecne w obiekcie docelowym, są ustawione na null
wartość .
W poniższych przykładach pokazano tę składnię:
Kolumny | Zapytanie (w języku SQL) | Zachowanie bez ewolucji schematu (ustawienie domyślne) | Zachowanie z ewolucją schematu |
---|---|---|---|
Kolumny docelowe: id, title, last_updated Kolumny źródłowe: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Nowe wiersze są wstawiane przy użyciu wartości dla id i title . Wykluczone pole last_updated ma wartość null . Pole review jest ignorowane, ponieważ nie znajduje się w obiekcie docelowym. |
Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Schemat został rozwinięty w celu dodania pola review . Nowe wiersze są wstawiane przy użyciu wszystkich pól źródłowych, z wyjątkiem last_updated tego, które są ustawione na null wartość . |
Kolumny docelowe: id, title, last_updated Kolumny źródłowe: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT zgłasza błąd, ponieważ kolumna internal_count nie istnieje w tabeli docelowej. |
Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Pole review jest dodawane do tabeli docelowej, ale internal_count pole jest ignorowane. Wstawione nowe wiersze mają last_updated ustawioną wartość null . |
NullType
Obsługa kolumn w aktualizacjach schematu
Ponieważ parquet nie obsługuje NullType
, NullType
kolumny są porzucane z ramki danych podczas zapisywania w tabelach delty, ale nadal są przechowywane w schemacie. Po odebraniu innego typu danych dla tej kolumny usługa Delta Lake scala schemat z nowym typem danych. Jeśli usługa Delta Lake otrzyma NullType
dla istniejącej kolumny, stary schemat zostanie zachowany, a nowa kolumna zostanie porzucona podczas zapisu.
NullType
w strumieniu nie jest obsługiwane. Ponieważ należy ustawić schematy podczas korzystania z przesyłania strumieniowego, powinno to być bardzo rzadkie. NullType
nie jest również akceptowane w przypadku typów złożonych, takich jak ArrayType
i MapType
.
Zamienianie schematu tabeli
Domyślnie zastępowanie danych w tabeli nie powoduje zastąpienia schematu. W przypadku zastępowania tabeli przy użyciu polecenia mode("overwrite")
bez replaceWhere
polecenia można nadal zastąpić schemat zapisywanych danych. Zastąp schemat i partycjonowanie tabeli, ustawiając overwriteSchema
opcję na true
:
df.write.option("overwriteSchema", "true")
Ważne
Nie można określić overwriteSchema
wartości podczas true
używania zastępowania partycji dynamicznej.