Aktualizace schématu tabulky Delta Lake
Delta Lake umožňuje aktualizovat schéma tabulky. Jsou podporovány následující typy změn:
- Přidání nových sloupců (na libovolných pozicích)
- Změna pořadí existujících sloupců
- Přejmenování existujících sloupců
Tyto změny můžete provést explicitně pomocí DDL nebo implicitně pomocí jazyka DML.
Důležité
Aktualizace schématu tabulky Delta je operace, která je v konfliktu se všemi souběžnými operacemi zápisu Delta.
Při aktualizaci schématu tabulky Delta se datové proudy čtené z této tabulky ukončí. Pokud chcete, aby datový proud pokračoval, musíte ho restartovat. Doporučené metody najdete v tématu Důležité informace o produkčním prostředí pro strukturované streamování.
Explicitní aktualizace schématu za účelem přidání sloupců
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Ve výchozím nastavení je nullability .true
Pokud chcete přidat sloupec do vnořeného pole, použijte:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Pokud je například schéma před spuštěním ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
následující:
- root
| - colA
| - colB
| +-field1
| +-field2
schéma je následující:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Poznámka:
Přidávání vnořených sloupců je podporováno pouze pro struktury. Pole a mapy nejsou podporovány.
Explicitní aktualizace schématu pro změnu komentáře nebo řazení sloupců
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Pokud chcete změnit sloupec v vnořeném poli, použijte:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Pokud je například schéma před spuštěním ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
následující:
- root
| - colA
| - colB
| +-field1
| +-field2
schéma je následující:
- root
| - colA
| - colB
| +-field2
| +-field1
Explicitní aktualizace schématu pro nahrazení sloupců
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Například při spuštění následujícího DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
pokud je schéma dříve:
- root
| - colA
| - colB
| +-field1
| +-field2
schéma je následující:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Explicitní aktualizace schématu pro přejmenování sloupců
Poznámka:
Tato funkce je dostupná ve verzi Databricks Runtime 10.4 LTS a vyšší.
Pokud chcete přejmenovat sloupce bez přepsání existujících dat sloupců, musíte pro tabulku povolit mapování sloupců. Viz Přejmenování a vyřazení sloupců s mapováním sloupců Delta Lake.
Přejmenování sloupce:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Přejmenování vnořeného pole:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Například při spuštění následujícího příkazu:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Pokud je schéma dříve:
- root
| - colA
| - colB
| +-field1
| +-field2
Potom schéma následuje:
- root
| - colA
| - colB
| +-field001
| +-field2
Viz Přejmenování a vyřazení sloupců s mapováním sloupců Delta Lake.
Explicitní aktualizace schématu pro vyřazení sloupců
Poznámka:
Tato funkce je dostupná ve verzi Databricks Runtime 11.3 LTS a vyšší.
Pokud chcete sloupce vypustit jako operaci jen pro metadata, aniž byste museli přepisovat datové soubory, musíte pro tabulku povolit mapování sloupců. Viz Přejmenování a vyřazení sloupců s mapováním sloupců Delta Lake.
Důležité
Vyřazení sloupce z metadat neodstraní podkladová data sloupce v souborech. Pokud chcete vyprázdnit zahozená data sloupce, můžete k přepsání souborů použít REORG TABLE . Pomocí funkce VAKUA pak můžete fyzicky odstranit soubory, které obsahují vynechaná data sloupce.
Odstranění sloupce:
ALTER TABLE table_name DROP COLUMN col_name
Odstranění více sloupců:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Explicitní aktualizace schématu pro změnu typu nebo názvu sloupce
Typ nebo název sloupce můžete změnit tak, že sloupec přepíšete. K tomu použijte overwriteSchema
možnost.
Následující příklad ukazuje změnu typu sloupce:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Následující příklad ukazuje změnu názvu sloupce:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Povolení vývoje schématu
Vývoj schématu můžete povolit jedním z následujících způsobů:
.option("mergeSchema", "true")
Nastavte datový rámecwrite
Sparku nebowriteStream
operaci. Další informace o přidání nových sloupců najdete v tématu Povolení vývoje schématu pro zápisy.- Použijte
MERGE WITH SCHEMA EVOLUTION
syntaxi. Viz syntaxe vývoje schématu pro sloučení. - Nastavte soubor Spark conf
spark.databricks.delta.schema.autoMerge.enabled
protrue
aktuální SparkSession.
Databricks doporučuje povolit vývoj schématu pro každou operaci zápisu místo nastavení souboru Spark.
Pokud používáte možnosti nebo syntaxi k povolení vývoje schématu v operaci zápisu, má přednost před souborem Spark Conf.
Poznámka:
Pro příkazy neexistuje žádná klauzule INSERT INTO
vývoje schématu.
Povolení vývoje schématu pro zápisy pro přidání nových sloupců
Sloupce, které jsou přítomné ve zdrojovém dotazu, ale chybí v cílové tabulce, se automaticky přidají jako součást transakce zápisu, když je povolen vývoj schématu. Viz Vývoj schématu povolení.
Při připojování nového sloupce se zachovají velká a malá písmena. Na konec schématu tabulky se přidají nové sloupce. Pokud jsou další sloupce ve struktuře, připojí se ke konci struktury v cílové tabulce.
Následující příklad ukazuje použití mergeSchema
možnosti s automatickým zavaděčem. Podívejte se, co je automatický zavaděč?
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Následující příklad ukazuje použití mergeSchema
možnosti s dávkovou operací zápisu:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Automatický vývoj schématu pro sloučení Delta Lake
Vývoj schématu umožňuje uživatelům vyřešit neshody schémat mezi cílovou a zdrojnou tabulkou při sloučení. Zpracovává následující dva případy:
- Sloupec ve zdrojové tabulce není v cílové tabulce. Nový sloupec se přidá do cílového schématu a jeho hodnoty se vloží nebo aktualizují pomocí zdrojových hodnot.
- Sloupec v cílové tabulce není ve zdrojové tabulce. Cílové schéma zůstane beze změny; hodnoty v dalším cílovém sloupci zůstanou beze změny (pro
UPDATE
) nebo jsou nastaveny naNULL
hodnotu (proINSERT
).
Musíte ručně povolit automatický vývoj schématu. Viz Vývoj schématu povolení.
Poznámka:
Ve službě Databricks Runtime 12.2 LTS a vyšších polích sloupců a struktur, která jsou ve zdrojové tabulce, je možné zadat podle názvu v akcích vložení nebo aktualizace. V Databricks Runtime 11.3 LTS a níže je možné použít pouze INSERT *
nebo UPDATE SET *
akce pro vývoj schématu při sloučení.
Ve službě Databricks Runtime 13.3 LTS a novějších můžete použít vývoj schématu s strukturami vnořenými uvnitř map, například map<int, struct<a: int, b: int>>
.
Syntaxe vývoje schématu pro sloučení
V Databricks Runtime 15.2 a novějších můžete zadat vývoj schématu v příkazu sloučení pomocí rozhraní SQL nebo Delta table API:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Příklady operací sloučení s vývojem schématu
Tady je několik příkladů vlivu merge
operací s vývojem schématu a bez nich.
Sloupce | Dotaz (v SQL) | Chování bez vývoje schématu (výchozí) | Chování s vývojem schématu |
---|---|---|---|
Cílové sloupce: key, value Zdrojové sloupce: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Schéma tabulky zůstává beze změny; pouze sloupce key , value jsou aktualizovány/vloženy. |
Schéma tabulky se změní na (key, value, new_value) . Existující záznamy se shodou se aktualizují pomocí value zdroje a new_value ve zdroji. Nové řádky se vloží se schématem (key, value, new_value) . |
Cílové sloupce: key, old_value Zdrojové sloupce: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE a INSERT akce vyvolá chybu, protože cílový sloupec old_value není ve zdroji. |
Schéma tabulky se změní na (key, old_value, new_value) . Stávající záznamy se shodami se aktualizují tak, aby new_value zdroj zůstal old_value beze změny. Nové záznamy jsou vloženy se zadaným key , new_value a NULL pro old_value . |
Cílové sloupce: key, old_value Zdrojové sloupce: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE vyvolá chybu, protože sloupec new_value v cílové tabulce neexistuje. |
Schéma tabulky se změní na (key, old_value, new_value) . Existující záznamy se shodami se aktualizují tak, aby new_value ve zdroji zůstaly old_value beze změny a byly zadány NULL chybějící záznamy pro new_value . Viz poznámka (1). |
Cílové sloupce: key, old_value Zdrojové sloupce: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT vyvolá chybu, protože sloupec new_value v cílové tabulce neexistuje. |
Schéma tabulky se změní na (key, old_value, new_value) . Nové záznamy jsou vloženy se zadaným key , new_value a NULL pro old_value . Existující záznamy byly zadány NULL pro new_value ponechání old_value beze změny. Viz poznámka (1). |
(1) Toto chování je k dispozici v Databricks Runtime 12.2 LTS a vyšší; Databricks Runtime 11.3 LTS a následující chyba v této podmínce
Vyloučení sloupců se sloučením Delta Lake
V Databricks Runtime 12.2 LTS a novějších můžete k explicitní vyloučení sloupců použít EXCEPT
klauzule v podmínkách sloučení. Chování klíčového EXCEPT
slova se liší v závislosti na tom, zda je povolen vývoj schématu.
Když je vývoj schématu zakázaný, EXCEPT
klíčové slovo se vztahuje na seznam sloupců v cílové tabulce a umožňuje vyloučit sloupce z UPDATE
nebo INSERT
akce. Vyloučené sloupce jsou nastaveny na null
hodnotu .
S povoleným vývojem schématu EXCEPT
se klíčové slovo vztahuje na seznam sloupců ve zdrojové tabulce a umožňuje vyloučit sloupce z vývoje schématu. Nový sloupec ve zdroji, který není v cíli, není přidán do cílového schématu, pokud je uveden v klauzuli EXCEPT
. Vyloučené sloupce, které jsou již v cíli, jsou nastaveny na null
hodnotu .
Následující příklady ukazují tuto syntaxi:
Sloupce | Dotaz (v SQL) | Chování bez vývoje schématu (výchozí) | Chování s vývojem schématu |
---|---|---|---|
Cílové sloupce: id, title, last_updated Zdrojové sloupce: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Odpovídající řádky se aktualizují nastavením last_updated pole na aktuální datum. Nové řádky se vkládají pomocí hodnot pro id a title . Vyloučené pole last_updated je nastaveno na null hodnotu . Pole review je ignorováno, protože není v cíli. |
Odpovídající řádky se aktualizují nastavením last_updated pole na aktuální datum. Schéma se vyvíjí tak, aby přidalo pole review . Nové řádky se vloží pomocí všech zdrojových polí s výjimkou last_updated těch, které jsou nastaveny na null . |
Cílové sloupce: id, title, last_updated Zdrojové sloupce: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT vyvolá chybu, protože sloupec internal_count v cílové tabulce neexistuje. |
Odpovídající řádky se aktualizují nastavením last_updated pole na aktuální datum. Pole review se přidá do cílové tabulky, ale internal_count pole se ignoruje. Nové vložené řádky jsou nastaveny last_updated na null hodnotu . |
Práce se NullType
sloupci v aktualizacích schématu
Vzhledem k tomu, že Parquet nepodporuje NullType
, NullType
sloupce se při zápisu do tabulek Delta zahodí z datového rámce, ale stále jsou uložené ve schématu. Při přijetí jiného datového typu pro tento sloupec Delta Lake schéma sloučí s novým datovým typem. Pokud Delta Lake obdrží NullType
pro existující sloupec, staré schéma se zachovají a během zápisu se nový sloupec zahodí.
NullType
streamování se nepodporuje. Vzhledem k tomu, že při použití streamování je nutné nastavit schémata, mělo by to být velmi vzácné. NullType
pro komplexní typy, jako ArrayType
MapType
je a .
Nahrazení schématu tabulky
Ve výchozím nastavení přepsání dat v tabulce schéma nepřepíše. Při přepsání tabulky pomocí mode("overwrite")
bez replaceWhere
, můžete stále chtít přepsat schéma zapisovaných dat. Schéma a dělení tabulky nahradíte nastavením overwriteSchema
možnosti :true
df.write.option("overwriteSchema", "true")
Důležité
Nelze určit overwriteSchema
, jako true
při použití dynamického přepsání oddílu.