Dela via


Update Delta Lake tableschema

Med Delta Lake kan du updateschema för en table. Följande typer av ändringar stöds:

  • Lägga till nya columns (vid godtyckliga positioner)
  • Ändra ordning på befintliga columns
  • Byta namn på befintlig columns

Du kan göra dessa ändringar explicit med DDL eller implicit med DML.

Viktigt!

En update till en Delta-tableschema är en operation som kommer i konflikt med alla samtidiga Delta-skrivåtgärder.

När du update en Delta-tableschema, avslutas strömmar som läser från den table. Om du vill att strömmen ska fortsätta måste du starta om den. Rekommenderade metoder finns i Produktionsöverväganden för strukturerad direktuppspelning.

Uttryckligen updateschema för att lägga till columns

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Som standard är truenullability .

Om du vill lägga till en column i ett kapslat fält använder du:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Om till exempel schema innan du kör ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) är:

- root
| - colA
| - colB
| +-field1
| +-field2

schema efter är:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Kommentar

Det går bara att lägga till kapslade columns för strukturer. Matriser och kartor stöds inte.

Uttryckligen updateschema för att ändra column kommentar eller ordning

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Om du vill ändra en column i ett kapslat fält använder du:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Om till exempel schema innan du kör ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST är:

- root
| - colA
| - colB
| +-field1
| +-field2

schema efter är:

- root
| - colA
| - colB
| +-field2
| +-field1

Uttryckligen updateschema för att ersätta columns

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Till exempel när du kör följande DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

om schema tidigare är:

- root
| - colA
| - colB
| +-field1
| +-field2

schema efter är:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

uttryckligen updateschema att byta namn på columns

Kommentar

Den här funktionen är tillgänglig i Databricks Runtime 10.4 LTS och senare.

Om du vill byta namn på columns utan att skriva om någon av columnsbefintliga data måste du aktivera column mappning för table. Se Byt namn och ta bort columns med Delta Lake column kartläggning.

Så här byter du namn på en column:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Så här byter du namn på ett kapslat fält:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

När du till exempel kör följande kommando:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Om schema tidigare är:

- root
| - colA
| - colB
| +-field1
| +-field2

Sedan är schema efter:

- root
| - colA
| - colB
| +-field001
| +-field2

Se Byt namn och ta bort columns med Delta Lake column-mappning.

uttryckligen updateschema att släppa columns

Kommentar

Den här funktionen är tillgänglig i Databricks Runtime 11.3 LTS och senare.

Om du vill släppa columns som en endast metadataåtgärd utan att skriva om några datafiler måste du aktivera column mappning för table. Se Byt namn på och ta bort columns med Delta Lake column mappning.

Viktigt!

Om du tar bort en column från metadata, tas inte den underliggande datan bort för column i filerna. Om du vill rensa borttagna column data kan du använda REORG TABLE för att skriva om filer. Du kan sedan använda VACUUM för att fysiskt ta bort de filer som innehåller borttagna column data.

Så här släpper du en column:

ALTER TABLE table_name DROP COLUMN col_name

Så här släpper du flera columns:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

uttryckligen updateschema att ändra column typ eller namn

Du kan ändra en columntyp eller namn eller släppa en column genom att skriva om table. Använd alternativet overwriteSchema för att göra detta.

I följande exempel visas hur du ändrar en column typ:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

I följande exempel visas hur du ändrar ett column namn:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Aktivera schema utveckling

Du kan aktivera schema utveckling genom att göra något av följande:

Databricks rekommenderar att du aktiverar schema evolution för varje skrivåtgärd istället för att ställa in en Spark-konfiguration.

När du använder alternativ eller syntax för att aktivera schema utveckling i en skrivåtgärd har detta företräde framför Spark-konfigurationen.

Kommentar

Det finns ingen schema evolutionsklausul för INSERT INTO-påståenden.

Aktivera schema utveckling för skrivningar för att lägga till nya columns

Columns som finns i källfrågan men som saknas i målet table läggs automatiskt till som en del av en skrivtransaktion när schema evolution är aktiverad. Se Aktivera schema Evolution.

Det aktuella läget bevaras när en ny columnläggs till. Nya columns läggs till i slutet av tableschema. Om de ytterligare columns är i en struct läggs de till i slutet av strukturen i målstrukturen table.

I följande exempel visas hur du använder mergeSchema alternativet med Automatisk inläsning. Se Vad är automatisk inläsare?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

I följande exempel visas hur du använder mergeSchema alternativet med en batchskrivningsåtgärd:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Automatisk schema utveckling för Delta Lake-sammanslagning

Schema utveckling gör det möjligt för användare att lösa schema missmatchningar mellan mål- och käll-table i sammanfogning. Den hanterar följande två fall:

  1. En column i källan table finns inte i målet table. Den nya column läggs till i målet schemaoch dess values infogas eller uppdateras med hjälp av källan values.
  2. En column i mål table finns inte i källan table. Målet schema lämnas oförändrat. values i det ytterligare målet column lämnas antingen oförändrade (för UPDATE) eller setNULL (för INSERT).

Du måste aktivera automatisk schema-utveckling manuellt. Se Aktivera schema evolution.

Kommentar

I Databricks Runtime 12.2 LTS och senare kan columns och strukturfält som finns i källan table specificeras med namn i insert eller update-åtgärder. I Databricks Runtime 11.3 LTS och nedan kan endast INSERT * eller UPDATE SET * åtgärder användas för schema utveckling med sammanslagning.

I Databricks Runtime 13.3 LTS och senare kan du använda schema utveckling med structs kapslade inuti kartor, till exempel map<int, struct<a: int, b: int>>.

Schema evolutionssyntax för sammanslagning

I Databricks Runtime 15.2 och senare kan du ange schema-utveckling i en sammanfogningsinstruktion genom att använda SQL- eller Delta table-API:er.

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exempelåtgärder för sammanslagning med schema utveckling

Här följer några exempel på effekterna av merge drift med och utan schema utveckling.

Columns Fråga (i SQL) Beteende utan schema utveckling (standardinställning) Beteende i samband med schema-utveckling
Mål columns: key, value

Källa columns: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Den tableschema förblir oförändrad; endast columnskey, value uppdateras/infogas. table schema ändras till (key, value, new_value). Befintliga poster med matchningar uppdateras med value och new_value i källan. Nya rader infogas med schema(key, value, new_value).
Mål columns: key, old_value

Källa columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE- och INSERT-åtgärder utlöser ett fel eftersom mål-columnold_value inte finns i källan. table schema ändras till (key, old_value, new_value). Befintliga poster med matchningar uppdateras med new_value i källan lämnar old_value oförändrade. Nya poster infogas med angiven key, new_valueoch NULL för old_value.
Mål columns: key, old_value

Källa columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE utlöser ett fel eftersom columnnew_value inte finns i målet table. table schema ändras till (key, old_value, new_value). Befintliga poster med matchningar uppdateras med new_value i källan lämnar old_value oförändrade och omatchade poster har NULL angetts för new_value. Se anteckning (1).
Mål columns: key, old_value

Källa columns: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT utlöser ett fel eftersom columnnew_value inte finns i målet table. table schema ändras till (key, old_value, new_value). Nya poster infogas med angiven key, new_valueoch NULL för old_value. Befintliga poster har NULL angetts för new_value att lämna old_value oförändrade. Se anteckning (1).

(1) Det här beteendet är tillgängligt i Databricks Runtime 12.2 LTS och senare. Databricks Runtime 11.3 LTS och nedanstående fel i det här villkoret.

Exkludera columns med Delta Lake-sammanslagning

I Databricks Runtime 12.2 LTS och senare kan du använda EXCEPT-satser i kopplingsvillkor för att uttryckligen undanta columns. Beteendet för nyckelordet EXCEPT varierar beroende på om schema utveckling är aktiverad eller inte.

Med schema utveckling inaktiverad gäller nyckelordet EXCEPT för list av columns i mål table och tillåter att columns undantas från UPDATE eller INSERT åtgärder. Exkluderade är set till nullfrån columns.

När utvecklingen av schema är aktiverad gäller nyckelordet EXCEPT för list av columns i källan table och tillåter uteslutandet av columns från utvecklingen av schema. En ny column i källan som inte finns i måldokumentet läggs inte till i schema i måldokumentet om den finns med i EXCEPT-klausulen. Exkluderar columns som redan finns i målet från set till null.

Följande exempel visar den här syntaxen:

Columns Fråga (i SQL) Beteende utan schema utveckling (standard) Utvecklingen av beteende med schema
Mål columns: id, title, last_updated

Källa columns: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum. Nya rader infogas med values för id och title. Det exkluderade fältet last_updated är set till null. Fältet review ignoreras eftersom det inte finns i målet. Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum. Schema har utvecklats för att lägga till fältet review. Nya rader infogas med alla källfält utom last_updated, vilket är set till null.
Mål columns: id, title, last_updated

Källa columns: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT utlöser ett fel eftersom columninternal_count inte finns i målet table. Matchade rader uppdateras genom att fältet anges last_updated till aktuellt datum. Fältet review läggs till i målfältet table, men fältet internal_count ignoreras. Nya infogade rader har last_updatedset till null.

Hantera NullTypecolumns i schema uppdateringar

Eftersom Parquet inte stöder NullTypetas NullTypecolumns bort från DataFrame när du skriver till Delta tables, men lagras fortfarande i schema. När en annan datatyp tas emot för den columnsammanfogar Delta Lake schema till den nya datatypen. Om Delta Lake tar emot en NullType för en befintlig column, behålls den gamla schema och den nya column tas bort vid skrivning.

NullType i strömning stöds inte. Eftersom du måste set scheman när du använder strömmande bör detta vara mycket sällsynt. NullType accepteras inte heller för komplexa typer som ArrayType och MapType.

Ersätt tableschema

Om du skriver över data i en table skrivs inte schemaöver som standard. När du skriver över en table med mode("overwrite") utan replaceWherekanske du fortfarande vill skriva över schema av de data som skrivs. Du ersätter schema och partitionering av table genom att ange alternativet overwriteSchema till true:

df.write.option("overwriteSchema", "true")

Viktigt!

Du kan inte ange overwriteSchema som true när du använder dynamisk partition skriv över.