Udostępnij za pośrednictwem


Aktualizowanie schematu tabeli usługi Delta Lake

Usługa Delta Lake umożliwia aktualizowanie schematu tabeli. Obsługiwane są następujące typy zmian:

  • Dodawanie nowych kolumn (w dowolnych pozycjach)
  • Zmiana kolejności istniejących kolumn
  • Zmienianie nazw istniejących kolumn

Te zmiany można jawnie wprowadzić przy użyciu języka DDL lub niejawnie przy użyciu języka DML.

Ważne

Aktualizacja schematu tabeli delty to operacja, która powoduje konflikt ze wszystkimi współbieżnymi operacjami zapisu różnicowego.

Po zaktualizowaniu schematu tabeli Delta strumienie odczytane z tej tabeli zakończą się. Jeśli chcesz, aby strumień kontynuował, musisz uruchomić go ponownie. Aby zapoznać się z zalecanymi metodami, zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.

Jawna aktualizacja schematu w celu dodawania kolumn

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Domyślnie wartość null ma wartość true.

Aby dodać kolumnę do zagnieżdżonego pola, użyj:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Jeśli na przykład schemat przed uruchomieniem ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) to:

- root
| - colA
| - colB
| +-field1
| +-field2

schemat po:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Uwaga

Dodawanie zagnieżdżonych kolumn jest obsługiwane tylko dla struktur. Tablice i mapy nie są obsługiwane.

Jawna aktualizacja schematu w celu zmiany komentarza do kolumny lub kolejności

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Aby zmienić kolumnę w zagnieżdżonym polu, użyj:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Jeśli na przykład schemat przed uruchomieniem ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST to:

- root
| - colA
| - colB
| +-field1
| +-field2

schemat po:

- root
| - colA
| - colB
| +-field2
| +-field1

Jawna aktualizacja schematu w celu zastąpienia kolumn

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Na przykład podczas uruchamiania następującego kodu DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

jeśli schemat przed jest:

- root
| - colA
| - colB
| +-field1
| +-field2

schemat po:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Jawne aktualizowanie schematu w celu zmiany nazwy kolumn

Uwaga

Ta funkcja jest dostępna w środowisku Databricks Runtime 10.4 LTS lub nowszym.

Aby zmienić nazwy kolumn bez ponownego zapisywania istniejących danych kolumn, należy włączyć mapowanie kolumn dla tabeli. Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.

Aby zmienić nazwę kolumny:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Aby zmienić nazwę zagnieżdżonego pola:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Na przykład po uruchomieniu następującego polecenia:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Jeśli schemat przed jest:

- root
| - colA
| - colB
| +-field1
| +-field2

Następnie schemat po:

- root
| - colA
| - colB
| +-field001
| +-field2

Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.

Jawne aktualizowanie schematu w celu porzucania kolumn

Uwaga

Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.

Aby usunąć kolumny jako operację tylko dla metadanych bez ponownego zapisywania plików danych, należy włączyć mapowanie kolumn dla tabeli. Zobacz Zmienianie nazwy i usuwanie kolumn za pomocą mapowania kolumn usługi Delta Lake.

Ważne

Usunięcie kolumny z metadanych nie powoduje usunięcia danych bazowych dla kolumny w plikach. Aby przeczyścić usunięte dane kolumn, możesz użyć tabeli REORG do ponownego zapisywania plików. Następnie możesz użyć funkcji VACUUM , aby fizycznie usunąć pliki zawierające porzucone dane kolumny.

Aby usunąć kolumnę:

ALTER TABLE table_name DROP COLUMN col_name

Aby usunąć wiele kolumn:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Jawne aktualizowanie schematu w celu zmiany typu kolumny lub nazwy

Możesz zmienić typ lub nazwę kolumny lub usunąć kolumnę, zapisując ponownie tabelę. W tym celu użyj overwriteSchema opcji .

W poniższym przykładzie pokazano zmianę typu kolumny:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

W poniższym przykładzie pokazano zmianę nazwy kolumny:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Włączanie ewolucji schematu

Możesz włączyć ewolucję schematu, wykonując jedną z następujących czynności:

Usługa Databricks zaleca włączenie ewolucji schematu dla każdej operacji zapisu zamiast ustawiania conf platformy Spark.

Jeśli używasz opcji lub składni w celu włączenia ewolucji schematu w operacji zapisu, ma to pierwszeństwo przed ograniczeniem platformy Spark.

Uwaga

Nie ma klauzuli ewolucji schematu dla INSERT INTO instrukcji.

Włączanie ewolucji schematu dla zapisów w celu dodania nowych kolumn

Kolumny, które znajdują się w zapytaniu źródłowym, ale brakuje w tabeli docelowej, są automatycznie dodawane w ramach transakcji zapisu po włączeniu ewolucji schematu. Zobacz Włączanie ewolucji schematu.

Wielkość liter jest zachowywana podczas dołączania nowej kolumny. Nowe kolumny są dodawane na końcu schematu tabeli. Jeśli dodatkowe kolumny znajdują się w strukturę, są dołączane na końcu struktury w tabeli docelowej.

W poniższym przykładzie pokazano użycie opcji z funkcją mergeSchema automatycznego ładowania. Zobacz Co to jest moduł automatycznego ładowania?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

W poniższym przykładzie pokazano użycie mergeSchema opcji z operacją zapisu wsadowego:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Automatyczna ewolucja schematu dla scalania usługi Delta Lake

Ewolucja schematu umożliwia użytkownikom rozwiązywanie niezgodności schematów między tabelą docelową i źródłową w scalaniu. Obsługuje on następujące dwa przypadki:

  1. Kolumna w tabeli źródłowej nie jest obecna w tabeli docelowej. Nowa kolumna jest dodawana do schematu docelowego, a jego wartości są wstawiane lub aktualizowane przy użyciu wartości źródłowych.
  2. Kolumna w tabeli docelowej nie jest obecna w tabeli źródłowej. Schemat docelowy pozostaje niezmieniony; wartości w dodatkowej kolumnie docelowej są pozostawione bez zmian (dla UPDATE) lub ustawione na NULL wartość (dla INSERT).

Należy ręcznie włączyć automatyczną ewolucję schematu. Zobacz Włączanie ewolucji schematu.

Uwaga

W środowisku Databricks Runtime 12.2 LTS lub nowszym kolumny i pola struktury obecne w tabeli źródłowej mogą być określane przez nazwę w akcjach wstawiania lub aktualizowania. W środowisku Databricks Runtime 11.3 LTS i poniżej można używać tylko INSERT * akcji lub UPDATE SET * akcji do ewolucji schematu z scalania.

W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć ewolucji schematu z strukturami zagnieżdżonym wewnątrz map, takimi jak map<int, struct<a: int, b: int>>.

Składnia ewolucji schematu dla scalania

W środowisku Databricks Runtime 15.2 lub nowszym można określić ewolucję schematu w instrukcji scalania przy użyciu interfejsów API tabel SQL lub delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Przykładowe operacje scalania z ewolucją schematu

Oto kilka przykładów efektów merge operacji z ewolucją schematu i bez nich.

Kolumny Zapytanie (w języku SQL) Zachowanie bez ewolucji schematu (ustawienie domyślne) Zachowanie z ewolucją schematu
Kolumny docelowe: key, value

Kolumny źródłowe: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Schemat tabeli pozostaje niezmieniony; tylko kolumny key, value są aktualizowane/wstawiane. Schemat tabeli został zmieniony na (key, value, new_value). Istniejące rekordy z dopasowaniami są aktualizowane za pomocą elementu value i new_value w źródle. Nowe wiersze są wstawiane ze schematem (key, value, new_value).
Kolumny docelowe: key, old_value

Kolumny źródłowe: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE i INSERT akcje zgłaszają błąd, ponieważ kolumna old_value docelowa nie znajduje się w źródle. Schemat tabeli został zmieniony na (key, old_value, new_value). Istniejące rekordy z dopasowaniami są aktualizowane za pomocą new_value elementu w źródle, pozostawiając old_value niezmienione. Nowe rekordy są wstawiane z określonymi keyelementami , new_valuei NULL dla elementu old_value.
Kolumny docelowe: key, old_value

Kolumny źródłowe: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE zgłasza błąd, ponieważ kolumna new_value nie istnieje w tabeli docelowej. Schemat tabeli został zmieniony na (key, old_value, new_value). Istniejące rekordy z dopasowaniami są aktualizowane za pomocą new_value elementu w źródle, pozostawiając old_value niezmienione, a niedopasowane rekordy zostały NULL wprowadzone dla elementu new_value. Zobacz notatkę (1).
Kolumny docelowe: key, old_value

Kolumny źródłowe: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT zgłasza błąd, ponieważ kolumna new_value nie istnieje w tabeli docelowej. Schemat tabeli został zmieniony na (key, old_value, new_value). Nowe rekordy są wstawiane z określonymi keyelementami , new_valuei NULL dla elementu old_value. Istniejące rekordy zostały NULL wprowadzone w celu new_value pozostawienia old_value niezmienionego. Zobacz notatkę (1).

(1) To zachowanie jest dostępne w środowisku Databricks Runtime 12.2 LTS lub nowszym; Databricks Runtime 11.3 LTS i poniżej błędu w tym warunku.

Wykluczanie kolumn z scalania usługi Delta Lake

W środowisku Databricks Runtime 12.2 LTS i nowszym można użyć EXCEPT klauzul w warunkach scalania, aby jawnie wykluczyć kolumny. Zachowanie słowa kluczowego EXCEPT różni się w zależności od tego, czy jest włączona ewolucja schematu.

Po wyłączeniu EXCEPT ewolucji schematu słowo kluczowe dotyczy listy kolumn w tabeli docelowej i zezwala na wykluczenie kolumn z UPDATE lub INSERT akcji. Wykluczone kolumny są ustawione na nullwartość .

Po włączeniu EXCEPT ewolucji schematu słowo kluczowe ma zastosowanie do listy kolumn w tabeli źródłowej i umożliwia wykluczenie kolumn z ewolucji schematu. Nowa kolumna w źródle, który nie znajduje się w obiekcie docelowym, nie jest dodawana do schematu docelowego, jeśli jest wymieniona w klauzuli EXCEPT . Wykluczone kolumny, które są już obecne w obiekcie docelowym, są ustawione na nullwartość .

W poniższych przykładach pokazano tę składnię:

Kolumny Zapytanie (w języku SQL) Zachowanie bez ewolucji schematu (ustawienie domyślne) Zachowanie z ewolucją schematu
Kolumny docelowe: id, title, last_updated

Kolumny źródłowe: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Nowe wiersze są wstawiane przy użyciu wartości dla id i title. Wykluczone pole last_updated ma wartość null. Pole review jest ignorowane, ponieważ nie znajduje się w obiekcie docelowym. Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Schemat został rozwinięty w celu dodania pola review. Nowe wiersze są wstawiane przy użyciu wszystkich pól źródłowych, z wyjątkiem last_updated tego, które są ustawione na nullwartość .
Kolumny docelowe: id, title, last_updated

Kolumny źródłowe: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT zgłasza błąd, ponieważ kolumna internal_count nie istnieje w tabeli docelowej. Dopasowane wiersze są aktualizowane przez ustawienie last_updated pola na bieżącą datę. Pole review jest dodawane do tabeli docelowej, ale internal_count pole jest ignorowane. Wstawione nowe wiersze mają last_updated ustawioną wartość null.

NullType Obsługa kolumn w aktualizacjach schematu

Ponieważ parquet nie obsługuje NullType, NullType kolumny są porzucane z ramki danych podczas zapisywania w tabelach delty, ale nadal są przechowywane w schemacie. Po odebraniu innego typu danych dla tej kolumny usługa Delta Lake scala schemat z nowym typem danych. Jeśli usługa Delta Lake otrzyma NullType dla istniejącej kolumny, stary schemat zostanie zachowany, a nowa kolumna zostanie porzucona podczas zapisu.

NullType w strumieniu nie jest obsługiwane. Ponieważ należy ustawić schematy podczas korzystania z przesyłania strumieniowego, powinno to być bardzo rzadkie. NullType nie jest również akceptowane w przypadku typów złożonych, takich jak ArrayType i MapType.

Zamienianie schematu tabeli

Domyślnie zastępowanie danych w tabeli nie powoduje zastąpienia schematu. W przypadku zastępowania tabeli przy użyciu polecenia mode("overwrite") bez replaceWherepolecenia można nadal zastąpić schemat zapisywanych danych. Zastąp schemat i partycjonowanie tabeli, ustawiając overwriteSchema opcję na true:

df.write.option("overwriteSchema", "true")

Ważne

Nie można określić overwriteSchema wartości podczas true używania zastępowania partycji dynamicznej.