Přehled Linux Foundation Delta Lake
Tento článek byl upraven tak, aby byl srozumitelnější než jeho původní protějšek zde. Tento článek vám pomůže rychle prozkoumat hlavní funkce Delta Lake. Tento článek obsahuje fragmenty kódu, které ukazují, jak číst a zapisovat do tabulek Delta Lake z interaktivních, dávkových a streamovaných dotazů. Fragmenty kódu jsou také dostupné v sadě poznámkových bloků PySpark tady, Scala here a C# tady.
Budeme se zabývat následujícími informacemi:
- Vytvoření tabulky
- Čtení dat
- Aktualizace dat tabulky
- Přepsání dat tabulky
- Podmíněná aktualizace bez přepsání
- Čtení starších verzí dat pomocí funkce Time Travel
- Zápis datového proudu do tabulky
- Čtení streamu změn z tabulky
- Podpora SQL
Konfigurace
Nezapomeňte upravit následující položky tak, aby byly vhodné pro vaše prostředí.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Výsledky v:
'/delta/delta-table-335323'
Vytvoření tabulky
Pokud chcete vytvořit tabulku Delta Lake, zapište datový rámec do datového rámce ve formátu delta. Formát můžete změnit z Parquet, CSV, JSON atd. na delta.
Následující kód ukazuje, jak vytvořit novou tabulku Delta Lake pomocí schématu odvozeného z datového rámce.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Výsledky v:
ID |
---|
0 |
1 |
2 |
3 |
4 |
Čtení dat
Data v tabulce Delta Lake můžete číst zadáním cesty k souborům a formátu delta.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Výsledky v:
ID |
---|
1 |
3 |
4 |
0 |
2 |
Pořadí výsledků se liší od výše uvedeného, protože před výstupem výsledků nebylo explicitně zadáno žádné pořadí.
Aktualizace dat tabulky
Delta Lake podporuje několik operací pro úpravu tabulek pomocí standardních rozhraní API datových rámců. Tyto operace jsou jedním z vylepšení, která přidává rozdílový formát. Následující příklad spustí dávkovou úlohu, která přepíše data v tabulce.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Výsledky v:
ID |
---|
7 |
8 |
5 |
9 |
6 |
Tady vidíte, že všech pět záznamů bylo aktualizováno tak, aby se vešly nové hodnoty.
Uložit jako tabulky katalogu
Delta Lake může zapisovat do spravovaných nebo externích tabulek katalogu.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Výsledky v:
database | tableName | isTemporary |
---|---|---|
default | externaldeltatable | false (nepravda) |
default | manageddeltatable | false (nepravda) |
Pomocí tohoto kódu jste vytvořili novou tabulku v katalogu z existujícího datového rámce označovaného jako spravovaná tabulka. Potom jste v katalogu definovali novou externí tabulku, která používá existující umístění označované jako externí tabulka. Ve výstupu vidíte, že obě tabulky jsou v katalogu uvedené bez ohledu na to, jak byly vytvořeny.
Teď se můžete podívat na rozšířené vlastnosti obou těchto tabulek.
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Výsledky v:
col_name | Data_type | comment |
---|---|---|
id | bigint | null |
Podrobné informace o tabulce | ||
Databáze | default | |
Tabulka | manageddeltatable | |
Vlastník | trusted-service-user | |
Čas vytvoření | So Dub 25 00:35:34 UTC 2020 | |
Poslední přístup | Čt jan 01 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | SPRAVOVANÉ | |
Poskytovatel | Delta | |
Vlastnosti tabulky | [transient_lastDdlTime=1587774934] | |
Statistika | 2407 bajtů | |
Umístění | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< název> pracovního prostoru/warehouse/manageddeltatable | |
Knihovna Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
Výstupní formát | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Vlastnosti úložiště | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Výsledky v:
col_name | Data_type | comment |
---|---|---|
id | bigint | null |
Podrobné informace o tabulce | ||
Databáze | default | |
Tabulka | externaldeltatable | |
Vlastník | trusted-service-user | |
Čas vytvoření | So Dub 25 00:35:38 UTC 2020 | |
Poslední přístup | Čt jan 01 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | EXTERNÍ | |
Poskytovatel | DELTA | |
Vlastnosti tabulky | [transient_lastDdlTime=1587774938] | |
Umístění | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Knihovna Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
Výstupní formát | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Vlastnosti úložiště | [serialization.format=1] |
Podmíněná aktualizace bez přepsání
Delta Lake poskytuje programová rozhraní API pro podmíněnou aktualizaci, odstranění a sloučení dat do tabulek (tento příkaz se běžně označuje jako upsert).
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Výsledky v:
ID |
---|
106 |
108 |
5 |
7 |
9 |
Tady jste právě přidali 100 k každému sudé ID.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Výsledky v:
ID |
---|
5 |
7 |
9 |
Všimněte si, že všechny sudé řádky byly odstraněny.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Výsledky v:
ID |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Tady máte kombinaci existujících dat. Stávajícím datům byla v cestě kódu update(WhenMatched) přiřazena hodnota -1. Byla přidána také nová data, která byla vytvořena v horní části fragmentu kódu a byla přidána prostřednictvím cesty pro vložení kódu (WhenNotMatched).
Historie
Delta Lake má schopnost nahlížet do historie tabulky. To znamená změny provedené v podkladové tabulce Delta. Následující buňka ukazuje, jak jednoduché je zkontrolovat historii.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Výsledky v:
verze | časové razítko | userId | userName | operation | operationParameters | úloha | poznámkový blok | clusterId | readVersion | Isolationlevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | null | null | SLOUČIT | [predikát -> (oldData.ID = newData.ID )] |
null | null | null | 3 | null | false (nepravda) |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikát -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
null | null | null | 2 | null | false (nepravda) |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikát -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | false (nepravda) |
1 | 2020-04-25 00:35:05 | null | null | ZÁPIS | [režim -> Overwrite, partitionBy -> []] | null | null | null | 0 | null | false (nepravda) |
0 | 2020-04-25 00:34:34 | null | null | ZÁPIS | [režim -> ErrorIfExists, partitionBy –> []] | null | null | null | null | null | true |
Tady si můžete prohlédnout všechny změny provedené u výše uvedených fragmentů kódu.
Čtení starších verzí dat pomocí funkce Time Travel
Na předchozí snímky tabulky Delta Lake se můžete dotazovat pomocí funkce s názvem Cesta v čase. Pokud chcete získat přístup k datům, která přepíšete, můžete před přepsáním první sady dat pomocí možnosti versionAsOf zadat dotaz na snímek tabulky.
Jakmile buňku níže spustíte, měli byste před přepsáním zobrazit první sadu dat z. Cestování v čase je výkonná funkce, která využívá výkon transakčního protokolu Delta Lake pro přístup k datům, která už nejsou v tabulce. Odebráním možnosti verze 0 (nebo zadáním verze 1) znovu zobrazíte novější data. Další informace najdete v tématu Dotazování na starší snímek tabulky.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Výsledky v:
ID |
---|
0 |
1 |
4 |
3 |
2 |
Tady vidíte, že jste se vrátili k nejstarší verzi dat.
Zápis streamu dat do tabulky
Do tabulky Delta Lake můžete také zapisovat pomocí strukturovaného streamování Sparku. Transakční protokol Delta Lake zaručuje přesně jedno zpracování, a to i v případě, že jsou v tabulce souběžně spuštěné další streamy nebo dávkové dotazy. Ve výchozím nastavení se streamy spouští v režimu připojení, který do tabulky přidává nové záznamy.
Další informace o integraci Delta Lake se strukturovaným streamováním najdete v tématu Čtení a zápisy streamování tabulek.
V následujících buňkách děláme toto:
- Buňka 30 : Zobrazení nově připojených dat
- Buňka 31 – historie kontroly
- Buňka 32 – Zastavení úlohy strukturovaného streamování
- Buňka 33 – Zkontrolujte historii <– Všimněte si, že připojení se zastavila
Nejprve nastavíte jednoduchou úlohu streamování Sparku, která vygeneruje sekvenci a zapíše ji do tabulky Delta.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Čtení proudu změn z tabulky
Zatímco stream zapisuje do tabulky Delta Lake, můžete z této tabulky také číst jako zdroj streamování. Můžete například spustit další dotaz streamování, který vytiskne všechny změny provedené v tabulce Delta Lake.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Výsledky v:
ID |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Výsledky v:
verze | časové razítko | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | AKTUALIZACE STREAMOVÁNÍ | [outputMode –> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] | 4 |
4 | 2020-04-25 00:36:27 | SLOUČIT | [predikát -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikát -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikát -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint)))] | 1 |
1 | 2020-04-25 00:35:05 | ZÁPIS | [režim –> Overwrite, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | ZÁPIS | [režim –> ErrorIfExists, partitionBy –> []] | null |
Tady vypouštíte některé z méně zajímavých sloupců, aby se zjednodušilo prohlížení zobrazení historie.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Výsledky v:
verze | časové razítko | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | AKTUALIZACE STREAMOVÁNÍ | [outputMode –> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] | 4 |
4 | 2020-04-25 00:36:27 | SLOUČIT | [predikát -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predikát -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predikát -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint)))] | 1 |
1 | 2020-04-25 00:35:05 | ZÁPIS | [režim –> Overwrite, partitionBy –> []] | 0 |
0 | 2020-04-25 00:34:34 | ZÁPIS | [režim –> ErrorIfExists, partitionBy –> []] | null |
Převod Parquetu na Delta
Můžete provést místní převod z formátu Parquet na Delta.
Tady otestujete, jestli je existující tabulka v rozdílovém formátu.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Výsledky v:
Ne
Teď převedete data do rozdílového formátu a ověříte, že fungovala.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Výsledky v:
Ano
Podpora SQL
Delta podporuje příkazy tabulkových nástrojů prostřednictvím SQL. SQL můžete použít k:
- Získání historie deltatable
- Vysávání tabulky DeltaTable
- Převod souboru Parquet na Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Výsledky v:
verze | časové razítko | userId | userName | operation | operationParameters | úloha | poznámkový blok | clusterId | readVersion | Isolationlevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | null | null | AKTUALIZACE STREAMOVÁNÍ | [outputMode –> ap... | null | null | null | 4 | null | true |
4 | 2020-04-25 00:36:27 | null | null | SLOUČIT | [predikát -> (ol... | null | null | null | 3 | null | false (nepravda) |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predikát -> ["(... | null | null | null | 2 | null | false (nepravda) |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predikát -> ((i... | null | null | null | 1 | null | false (nepravda) |
1 | 2020-04-25 00:35:05 | null | null | ZÁPIS | [režim –> Přepsaný... | null | null | null | 0 | null | false (nepravda) |
0 | 2020-04-25 00:34:34 | null | null | ZÁPIS | [režim –> ErrorIfe... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Výsledky v:
program |
---|
abfss://data@arca... |
Teď ověříte, že tabulka není tabulka s rozdílovým formátem. Potom pomocí Spark SQL převedete tabulku do rozdílového formátu a ověříte, že byla převedena správně.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Výsledky v:
Ano
Úplnou dokumentaci najdete na stránce dokumentace k Delta Lake.
Další informace najdete v tématu Projekt Delta Lake.