Översikt över Linux Foundation Delta Lake
Den här artikeln har anpassats för tydlighetens skull från den ursprungliga motsvarigheten här. Den här artikeln hjälper dig att snabbt utforska de viktigaste funktionerna i Delta Lake. Artikeln innehåller kodfragment som visar hur du läser från och skriver till Delta Lake-tabeller från interaktiva frågor, batchfrågor och strömningsfrågor. Kodfragmenten är också tillgängliga i en uppsättning notebook-filer PySpark här, Scala här och C# här
Här är vad vi kommer att täcka:
- Skapa en tabell
- Läsa data
- Uppdatera tabelldata
- Skriv över tabelldata
- Villkorlig uppdatering utan överskrivning
- Läsa äldre versioner av data med time travel
- Skriva en dataström till en tabell
- Läsa en dataström med ändringar från en tabell
- SQL-stöd
Konfiguration
Se till att du ändrar nedanstående efter behov för din miljö.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Resulterar i:
'/delta/delta-table-335323'
Skapa en tabell
Om du vill skapa en Delta Lake-tabell skriver du en DataFrame ut en DataFrame i deltaformat. Du kan ändra formatet från Parquet, CSV, JSON och så vidare till delta.
Koden nedan visar hur du skapar en ny Delta Lake-tabell med hjälp av schemat som härleds från din DataFrame.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Resulterar i:
ID |
---|
0 |
1 |
2 |
3 |
4 |
Läsa data
Du läser data i Delta Lake-tabellen genom att ange sökvägen till filerna och deltaformatet.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Resulterar i:
ID |
---|
1 |
3 |
4 |
0 |
2 |
Resultatordningen skiljer sig från ovan eftersom ingen ordning uttryckligen angavs innan resultatet matas ut.
Uppdatera tabelldata
Delta Lake stöder flera åtgärder för att ändra tabeller med hjälp av standard-API:er för DataFrame. Dessa åtgärder är en av förbättringarna som deltaformatet lägger till. I följande exempel körs ett batchjobb för att skriva över data i tabellen.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Resulterar i:
ID |
---|
7 |
8 |
5 |
9 |
6 |
Här kan du se att alla fem posterna har uppdaterats för att innehålla nya värden.
Spara som katalogtabeller
Delta Lake kan skriva till hanterade eller externa katalogtabeller.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Resulterar i:
databas | tableName | isTemporary |
---|---|---|
standard | externaldeltatable | falskt |
standard | manageddeltatable | falskt |
Med den här koden skapade du en ny tabell i katalogen från en befintlig dataram, som kallas en hanterad tabell. Sedan definierade du en ny extern tabell i katalogen som använder en befintlig plats, som kallas för en extern tabell. I utdata visas båda tabellerna, oavsett hur de skapades, i katalogen.
Nu kan du titta på de utökade egenskaperna för båda dessa tabeller
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Resulterar i:
col_name | data_type | kommentar |
---|---|---|
id | bigint | null |
Detaljerad tabellinformation | ||
Databas | standard | |
Tabell | manageddeltatable | |
Ägare | trusted-service-user | |
Genereringstid | lör 25 apr 00:35:34 UTC 2020 | |
Senaste åtkomst | Tor jan 01 00:00:00 UTC 1970 | |
Skapad av | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | HANTERADE | |
Leverantör | delta | |
Tabellegenskaper | [transient_lastDdlTime=1587774934] | |
Statistik | 2 407 byte | |
Location | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< arbetsytans namn>/lager/manageddeltatable | |
Serde-bibliotek | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Lagringsegenskaper | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Resultat i:
col_name | data_type | kommentar |
---|---|---|
id | bigint | null |
Detaljerad tabellinformation | ||
Databas | standard | |
Tabell | externaldeltatable | |
Ägare | trusted-service-user | |
Genereringstid | Lör 25 apr 00:35:38 UTC 2020 | |
Senaste åtkomst | Tor jan 01 00:00:00 UTC 1970 | |
Skapad av | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | EXTERNA | |
Leverantör | DELTA | |
Tabellegenskaper | [transient_lastDdlTime=1587774938] | |
Location | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Serde-bibliotek | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Lagringsegenskaper | [serialization.format=1] |
Villkorsstyrd uppdatering utan att skriva över
Delta Lake tillhandahåller programmatiska API:er för villkorlig uppdatering, borttagning och sammanslagning (det här kommandot kallas ofta för en upsert) data i tabeller.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Resultat i:
ID |
---|
106 |
108 |
5 |
7 |
9 |
Här har du precis lagt till 100 till varje jämnt ID.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Resultat i:
ID |
---|
5 |
7 |
9 |
Observera att varje jämn rad har tagits bort.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Resultat i:
ID |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Här har du en kombination av befintliga data. Befintliga data har tilldelats värdet -1 i kodsökvägen update(WhenMatched). De nya data som skapades överst i kodfragmentet och lades till via sökvägen infoga kod (WhenNotMatched) lades också till.
Historik
Delta Lake's har möjlighet att göra det möjligt att titta på historiken för en tabell. Det vill: de ändringar som har gjorts i den underliggande deltatabellen. Cellen nedan visar hur enkelt det är att inspektera historiken.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Resultat i:
version | timestamp | userId | userName | operation | operationParameters | jobb | notebook-fil | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | null | null | SAMMANFOGA | [predikat -> (oldData.ID = newData.ID )] |
null | null | null | 3 | null | falskt |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
null | null | null | 2 | null | falskt |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | falskt |
1 | 2020-04-25 00:35:05 | null | null | SKRIVA | [mode -> Skriv över, partitionBy –> []] | null | null | null | 0 | null | falskt |
0 | 2020-04-25 00:34:34 | null | null | SKRIVA | [mode -> ErrorIfExists, partitionBy -> []] | null | null | null | null | null | true |
Här kan du se alla ändringar som gjorts i kodfragmenten ovan.
Läsa äldre versioner av data med time travel
Du kan köra frågor mot tidigare ögonblicksbilder av Din Delta Lake-tabell med hjälp av en funktion som kallas Tidsresor. Om du vill komma åt de data som du skrev över kan du fråga en ögonblicksbild av tabellen innan du skriver över den första datauppsättningen med hjälp av alternativet versionAsOf.
När du har kört cellen nedan bör du se den första uppsättningen data från innan du skriver över den. Time Travel är en kraftfull funktion som utnyttjar kraften i Delta Lake-transaktionsloggen för att komma åt data som inte längre finns i tabellen. Om du tar bort alternativet version 0 (eller anger version 1) kan du se nyare data igen. Mer information finns i Fråga en äldre ögonblicksbild av en tabell.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Resulterar i:
ID |
---|
0 |
1 |
4 |
3 |
2 |
Här kan du se att du har gått tillbaka till den tidigaste versionen av data.
Skriva en dataström till en tabell
Du kan också skriva till en Delta Lake-tabell med hjälp av Sparks Structured Streaming. Delta Lake-transaktionsloggen garanterar bearbetning exakt en gång, även om det finns andra strömmar eller batchfrågor som körs samtidigt mot tabellen. Som standard körs strömmar i tilläggsläge, vilket lägger till nya poster i tabellen.
Mer information om Delta Lake-integrering med Structured Streaming finns i Läsa och skriva för tabellströmning.
I cellerna nedan gör vi följande:
- Cell 30 Visa nyligen tillagda data
- Cell 31 Inspektera historik
- Cell 32 Stoppa det strukturerade direktuppspelningsjobbet
- Cell 33 Inspektera historik <– Du kommer att märka att tilläggen har stoppats
Först ska du konfigurera ett enkelt Spark Streaming-jobb för att generera en sekvens och göra jobbet skrivet till deltatabellen.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Läsa en dataström med ändringar från en tabell
När strömmen skrivs till Delta Lake-tabellen kan du också läsa från den tabellen som en strömningskälla. Du kan till exempel starta en annan strömmande fråga som skriver ut alla ändringar som gjorts i Delta Lake-tabellen.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Resulterar i:
ID |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Resulterar i:
version | timestamp | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | DIREKTUPPSPELNINGSUPPDATERING | [outputMode –> Tillägg, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | SAMMANFOGA | [predikat -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | SKRIVA | [mode -> Skriv över, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | SKRIVA | [mode -> ErrorIfExists, partitionBy -> []] | null |
Här släpper du några av de mindre intressanta kolumnerna för att förenkla visningsupplevelsen i historikvyn.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Resulterar i:
version | timestamp | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | DIREKTUPPSPELNINGSUPPDATERING | [outputMode –> Tillägg, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | SAMMANFOGA | [predikat -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | SKRIVA | [mode -> Skriv över, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | SKRIVA | [mode -> ErrorIfExists, partitionBy -> []] | null |
Konvertera Parquet till Delta
Du kan göra en konvertering på plats från Parquet-formatet till Delta.
Här ska du testa om den befintliga tabellen är i deltaformat eller inte.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Resulterar i:
Falskt
Nu ska du konvertera data till deltaformat och kontrollera att de fungerade.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resulterar i:
Sant
SQL-stöd
Delta stöder tabellverktygskommandon via SQL. Du kan använda SQL för att:
- Hämta deltatabellens historik
- Dammsuga en DeltaTable
- Konvertera en Parquet-fil till Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Resultat i:
version | timestamp | userId | userName | operation | operationParameters | jobb | notebook-fil | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | null | null | DIREKTUPPSPELNINGSUPPDATERING | [outputMode –> Ap... | null | null | null | 4 | null | true |
4 | 2020-04-25 00:36:27 | null | null | SAMMANFOGA | [predikat -> (ol... | null | null | null | 3 | null | falskt |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikat -> ["(... | null | null | null | 2 | null | falskt |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikat -> ((i... | null | null | null | 1 | null | falskt |
1 | 2020-04-25 00:35:05 | null | null | SKRIVA | [mode -> Överskrivning... | null | null | null | 0 | null | falskt |
0 | 2020-04-25 00:34:34 | null | null | SKRIVA | [mode -> ErrorIfE... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Resultat i:
path |
---|
abfss://data@arca... |
Nu ska du kontrollera att en tabell inte är en deltaformattabell. Sedan konverterar du tabellen till deltaformat med Spark SQL och bekräftar att den har konverterats korrekt.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resultat i:
Sant
Fullständig dokumentation finns på Delta Lake-dokumentationssidan
Mer information finns i Delta Lake Project.