Update Delta Lake tableschema
Med Delta Lake kan du updateschema för en table. Följande typer av ändringar stöds:
- Lägga till nya columns (vid godtyckliga positioner)
- Ändra ordning på befintliga columns
- Byta namn på befintlig columns
Du kan göra dessa ändringar explicit med DDL eller implicit med DML.
Viktigt!
En update till en Delta-tableschema är en operation som kommer i konflikt med alla samtidiga Delta-skrivåtgärder.
När du update en Delta-tableschema, avslutas strömmar som läser från den table. Om du vill att strömmen ska fortsätta måste du starta om den. Rekommenderade metoder finns i Produktionsöverväganden för strukturerad direktuppspelning.
Uttryckligen updateschema för att lägga till columns
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Som standard är true
nullability .
Om du vill lägga till en column i ett kapslat fält använder du:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Om till exempel schema innan du kör ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
är:
- root
| - colA
| - colB
| +-field1
| +-field2
schema efter är:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Kommentar
Det går bara att lägga till kapslade columns för strukturer. Matriser och kartor stöds inte.
Uttryckligen updateschema för att ändra column kommentar eller ordning
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Om du vill ändra en column i ett kapslat fält använder du:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Om till exempel schema innan du kör ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
är:
- root
| - colA
| - colB
| +-field1
| +-field2
schema efter är:
- root
| - colA
| - colB
| +-field2
| +-field1
Uttryckligen updateschema för att ersätta columns
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Till exempel när du kör följande DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
om schema tidigare är:
- root
| - colA
| - colB
| +-field1
| +-field2
schema efter är:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
uttryckligen updateschema att byta namn på columns
Kommentar
Den här funktionen är tillgänglig i Databricks Runtime 10.4 LTS och senare.
Om du vill byta namn på columns utan att skriva om någon av columnsbefintliga data måste du aktivera column mappning för table. Se Byt namn och ta bort columns med Delta Lake column kartläggning.
Så här byter du namn på en column:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Så här byter du namn på ett kapslat fält:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
När du till exempel kör följande kommando:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Om schema tidigare är:
- root
| - colA
| - colB
| +-field1
| +-field2
Sedan är schema efter:
- root
| - colA
| - colB
| +-field001
| +-field2
Se Byt namn och ta bort columns med Delta Lake column-mappning.
uttryckligen updateschema att släppa columns
Kommentar
Den här funktionen är tillgänglig i Databricks Runtime 11.3 LTS och senare.
Om du vill släppa columns som en endast metadataåtgärd utan att skriva om några datafiler måste du aktivera column mappning för table. Se Byt namn på och ta bort columns med Delta Lake column mappning.
Viktigt!
Om du tar bort en column från metadata, tas inte den underliggande datan bort för column i filerna. Om du vill rensa borttagna column data kan du använda REORG TABLE för att skriva om filer. Du kan sedan använda VACUUM för att fysiskt ta bort de filer som innehåller borttagna column data.
Så här släpper du en column:
ALTER TABLE table_name DROP COLUMN col_name
Så här släpper du flera columns:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
uttryckligen updateschema att ändra column typ eller namn
Du kan ändra en columntyp eller namn eller släppa en column genom att skriva om table. Använd alternativet overwriteSchema
för att göra detta.
I följande exempel visas hur du ändrar en column typ:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
I följande exempel visas hur du ändrar ett column namn:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Aktivera schema utveckling
Du kan aktivera schema utveckling genom att göra något av följande:
-
Set
.option("mergeSchema", "true")
till en Spark DataFrame-write
- ellerwriteStream
-åtgärd. Läs Aktivera schema utveckling för skrivoperationer för att lägga till nya columns. - Använd
MERGE WITH SCHEMA EVOLUTION
syntax. Se Schema evolutionssyntax för sammanslagning. -
Set Spark-konfigurationen
spark.databricks.delta.schema.autoMerge.enabled
tilltrue
för den aktuella SparkSession.
Databricks rekommenderar att du aktiverar schema evolution för varje skrivåtgärd istället för att ställa in en Spark-konfiguration.
När du använder alternativ eller syntax för att aktivera schema utveckling i en skrivåtgärd har detta företräde framför Spark-konfigurationen.
Kommentar
Det finns ingen schema evolutionsklausul för INSERT INTO
-påståenden.
Aktivera schema utveckling för skrivningar för att lägga till nya columns
Columns som finns i källfrågan men som saknas i målet table läggs automatiskt till som en del av en skrivtransaktion när schema evolution är aktiverad. Se Aktivera schema Evolution.
Det aktuella läget bevaras när en ny columnläggs till. Nya columns läggs till i slutet av tableschema. Om de ytterligare columns är i en struct läggs de till i slutet av strukturen i målstrukturen table.
I följande exempel visas hur du använder mergeSchema
alternativet med Automatisk inläsning. Se Vad är automatisk inläsare?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
I följande exempel visas hur du använder mergeSchema
alternativet med en batchskrivningsåtgärd:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Automatisk schema utveckling för Delta Lake-sammanslagning
Schema utveckling gör det möjligt för användare att lösa schema missmatchningar mellan mål- och käll-table i sammanfogning. Den hanterar följande två fall:
- En column i källan table finns inte i målet table. Den nya column läggs till i målet schemaoch dess values infogas eller uppdateras med hjälp av källan values.
- En column i mål table finns inte i källan table. Målet schema lämnas oförändrat. values i det ytterligare målet column lämnas antingen oförändrade (för
UPDATE
) eller setNULL
(förINSERT
).
Du måste aktivera automatisk schema-utveckling manuellt. Se Aktivera schema evolution.
Kommentar
I Databricks Runtime 12.2 LTS och senare kan columns och strukturfält som finns i källan table specificeras med namn i insert eller update-åtgärder. I Databricks Runtime 11.3 LTS och nedan kan endast INSERT *
eller UPDATE SET *
åtgärder användas för schema utveckling med sammanslagning.
I Databricks Runtime 13.3 LTS och senare kan du använda schema utveckling med structs kapslade inuti kartor, till exempel map<int, struct<a: int, b: int>>
.
Schema evolutionssyntax för sammanslagning
I Databricks Runtime 15.2 och senare kan du ange schema-utveckling i en sammanfogningsinstruktion genom att använda SQL- eller Delta table-API:er.
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Exempelåtgärder för sammanslagning med schema utveckling
Här följer några exempel på effekterna av merge
drift med och utan schema utveckling.
Columns | Fråga (i SQL) | Beteende utan schema utveckling (standardinställning) | Beteende i samband med schema-utveckling |
---|---|---|---|
Mål columns: key, value Källa columns: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Den tableschema förblir oförändrad; endast columnskey , value uppdateras/infogas. |
table
schema ändras till (key, value, new_value) . Befintliga poster med matchningar uppdateras med value och new_value i källan. Nya rader infogas med schema(key, value, new_value) . |
Mål columns: key, old_value Källa columns: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE - och INSERT -åtgärder utlöser ett fel eftersom mål-columnold_value inte finns i källan. |
table
schema ändras till (key, old_value, new_value) . Befintliga poster med matchningar uppdateras med new_value i källan lämnar old_value oförändrade. Nya poster infogas med angiven key , new_value och NULL för old_value . |
Mål columns: key, old_value Källa columns: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE utlöser ett fel eftersom columnnew_value inte finns i målet table. |
table
schema ändras till (key, old_value, new_value) . Befintliga poster med matchningar uppdateras med new_value i källan lämnar old_value oförändrade och omatchade poster har NULL angetts för new_value . Se anteckning (1). |
Mål columns: key, old_value Källa columns: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT utlöser ett fel eftersom columnnew_value inte finns i målet table. |
table
schema ändras till (key, old_value, new_value) . Nya poster infogas med angiven key , new_value och NULL för old_value . Befintliga poster har NULL angetts för new_value att lämna old_value oförändrade. Se anteckning (1). |
(1) Det här beteendet är tillgängligt i Databricks Runtime 12.2 LTS och senare. Databricks Runtime 11.3 LTS och nedanstående fel i det här villkoret.
Exkludera columns med Delta Lake-sammanslagning
I Databricks Runtime 12.2 LTS och senare kan du använda EXCEPT
-satser i kopplingsvillkor för att uttryckligen undanta columns. Beteendet för nyckelordet EXCEPT
varierar beroende på om schema utveckling är aktiverad eller inte.
Med schema utveckling inaktiverad gäller nyckelordet EXCEPT
för list av columns i mål table och tillåter att columns undantas från UPDATE
eller INSERT
åtgärder. Exkluderade är set till null
från columns.
När utvecklingen av schema är aktiverad gäller nyckelordet EXCEPT
för list av columns i källan table och tillåter uteslutandet av columns från utvecklingen av schema. En ny column i källan som inte finns i måldokumentet läggs inte till i schema i måldokumentet om den finns med i EXCEPT
-klausulen. Exkluderar columns som redan finns i målet från set till null
.
Följande exempel visar den här syntaxen:
Columns | Fråga (i SQL) | Beteende utan schema utveckling (standard) | Utvecklingen av beteende med schema |
---|---|---|---|
Mål columns: id, title, last_updated Källa columns: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum. Nya rader infogas med values för id och title . Det exkluderade fältet last_updated är set till null . Fältet review ignoreras eftersom det inte finns i målet. |
Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum.
Schema har utvecklats för att lägga till fältet review . Nya rader infogas med alla källfält utom last_updated , vilket är set till null . |
Mål columns: id, title, last_updated Källa columns: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT utlöser ett fel eftersom columninternal_count inte finns i målet table. |
Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum. Fältet review läggs till i målfältet table, men fältet internal_count ignoreras. Nya infogade rader har last_updated set till null . |
Hantera NullType
columns i schema uppdateringar
Eftersom Parquet inte stöder NullType
tas NullType
columns bort från DataFrame när du skriver till Delta tables, men lagras fortfarande i schema. När en annan datatyp tas emot för den columnsammanfogar Delta Lake schema till den nya datatypen. Om Delta Lake tar emot en NullType
för en befintlig column, behålls den gamla schema och den nya column tas bort vid skrivning.
NullType
i strömning stöds inte. Eftersom du måste set scheman när du använder strömmande bör detta vara mycket sällsynt.
NullType
accepteras inte heller för komplexa typer som ArrayType
och MapType
.
Ersätt tableschema
Om du skriver över data i en table skrivs inte schemaöver som standard. När du skriver över en table med mode("overwrite")
utan replaceWhere
kanske du fortfarande vill skriva över schema av de data som skrivs. Du ersätter schema och partitionering av table genom att ange alternativet overwriteSchema
till true
:
df.write.option("overwriteSchema", "true")
Viktigt!
Du kan inte ange overwriteSchema
som true
när du använder dynamisk partition skriv över.