Sdílet prostřednictvím


Přehled Linux Foundation Delta Lake

Tento článek byl upraven tak, aby byl srozumitelnější než jeho původní protějšek zde. Tento článek vám pomůže rychle prozkoumat hlavní funkce Delta Lake. Tento článek obsahuje fragmenty kódu, které ukazují, jak číst a zapisovat do tabulek Delta Lake z interaktivních, dávkových a streamovaných dotazů. Fragmenty kódu jsou také dostupné v sadě poznámkových bloků PySpark tady, Scala here a C# tady.

Budeme se zabývat následujícími informacemi:

  • Vytvoření tabulky
  • Čtení dat
  • Aktualizace dat tabulky
  • Přepsání dat tabulky
  • Podmíněná aktualizace bez přepsání
  • Čtení starších verzí dat pomocí funkce Time Travel
  • Zápis datového proudu do tabulky
  • Čtení streamu změn z tabulky
  • Podpora SQL

Konfigurace

Nezapomeňte upravit následující položky tak, aby byly vhodné pro vaše prostředí.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Výsledky v:

'/delta/delta-table-335323'

Vytvoření tabulky

Pokud chcete vytvořit tabulku Delta Lake, zapište datový rámec do datového rámce ve formátu delta. Formát můžete změnit z Parquet, CSV, JSON atd. na delta.

Následující kód ukazuje, jak vytvořit novou tabulku Delta Lake pomocí schématu odvozeného z datového rámce.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Výsledky v:

ID
0
1
2
3
4

Čtení dat

Data v tabulce Delta Lake můžete číst zadáním cesty k souborům a formátu delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Výsledky v:

ID
1
3
4
0
2

Pořadí výsledků se liší od výše uvedeného, protože před výstupem výsledků nebylo explicitně zadáno žádné pořadí.

Aktualizace dat tabulky

Delta Lake podporuje několik operací pro úpravu tabulek pomocí standardních rozhraní API datových rámců. Tyto operace jsou jedním z vylepšení, která přidává rozdílový formát. Následující příklad spustí dávkovou úlohu, která přepíše data v tabulce.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Výsledky v:

ID
7
8
5
9
6

Tady vidíte, že všech pět záznamů bylo aktualizováno tak, aby se vešly nové hodnoty.

Uložit jako tabulky katalogu

Delta Lake může zapisovat do spravovaných nebo externích tabulek katalogu.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Výsledky v:

database tableName isTemporary
default externaldeltatable false (nepravda)
default manageddeltatable false (nepravda)

Pomocí tohoto kódu jste vytvořili novou tabulku v katalogu z existujícího datového rámce označovaného jako spravovaná tabulka. Potom jste v katalogu definovali novou externí tabulku, která používá existující umístění označované jako externí tabulka. Ve výstupu vidíte, že obě tabulky jsou v katalogu uvedené bez ohledu na to, jak byly vytvořeny.

Teď se můžete podívat na rozšířené vlastnosti obou těchto tabulek.

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Výsledky v:

col_name Data_type comment
id bigint null
Podrobné informace o tabulce
Databáze default
Tabulka manageddeltatable
Vlastník trusted-service-user
Čas vytvoření So Dub 25 00:35:34 UTC 2020
Poslední přístup Čt jan 01 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Typ SPRAVOVANÉ
Poskytovatel Delta
Vlastnosti tabulky [transient_lastDdlTime=1587774934]
Statistika 2407 bajtů
Umístění abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< název> pracovního prostoru/warehouse/manageddeltatable
Knihovna Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
Výstupní formát org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Vlastnosti úložiště [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Výsledky v:

col_name Data_type comment
id bigint null
Podrobné informace o tabulce
Databáze default
Tabulka externaldeltatable
Vlastník trusted-service-user
Čas vytvoření So Dub 25 00:35:38 UTC 2020
Poslední přístup Čt jan 01 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Typ EXTERNÍ
Poskytovatel DELTA
Vlastnosti tabulky [transient_lastDdlTime=1587774938]
Umístění abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152>
Knihovna Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
Výstupní formát org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Vlastnosti úložiště [serialization.format=1]

Podmíněná aktualizace bez přepsání

Delta Lake poskytuje programová rozhraní API pro podmíněnou aktualizaci, odstranění a sloučení dat do tabulek (tento příkaz se běžně označuje jako upsert).

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Výsledky v:

ID
106
108
5
7
9

Tady jste právě přidali 100 k každému sudé ID.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Výsledky v:

ID
5
7
9

Všimněte si, že všechny sudé řádky byly odstraněny.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Výsledky v:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Tady máte kombinaci existujících dat. Stávajícím datům byla v cestě kódu update(WhenMatched) přiřazena hodnota -1. Byla přidána také nová data, která byla vytvořena v horní části fragmentu kódu a byla přidána prostřednictvím cesty pro vložení kódu (WhenNotMatched).

Historie

Delta Lake má schopnost nahlížet do historie tabulky. To znamená změny provedené v podkladové tabulce Delta. Následující buňka ukazuje, jak jednoduché je zkontrolovat historii.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Výsledky v:

verze časové razítko userId userName operation operationParameters úloha poznámkový blok clusterId readVersion Isolationlevel isBlindAppend
4 2020-04-25 00:36:27 null null SLOUČIT [predikát -> (oldData.ID = newData.ID)] null null null 3 null false (nepravda)
3 2020-04-25 00:36:08 null null DELETE [predikát -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null false (nepravda)
2 2020-04-25 00:35:51 null null UPDATE [predikát -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false (nepravda)
1 2020-04-25 00:35:05 null null ZÁPIS [režim -> Overwrite, partitionBy -> []] null null null 0 null false (nepravda)
0 2020-04-25 00:34:34 null null ZÁPIS [režim -> ErrorIfExists, partitionBy –> []] null null null null null true

Tady si můžete prohlédnout všechny změny provedené u výše uvedených fragmentů kódu.

Čtení starších verzí dat pomocí funkce Time Travel

Na předchozí snímky tabulky Delta Lake se můžete dotazovat pomocí funkce s názvem Cesta v čase. Pokud chcete získat přístup k datům, která přepíšete, můžete před přepsáním první sady dat pomocí možnosti versionAsOf zadat dotaz na snímek tabulky.

Jakmile buňku níže spustíte, měli byste před přepsáním zobrazit první sadu dat z. Cestování v čase je výkonná funkce, která využívá výkon transakčního protokolu Delta Lake pro přístup k datům, která už nejsou v tabulce. Odebráním možnosti verze 0 (nebo zadáním verze 1) znovu zobrazíte novější data. Další informace najdete v tématu Dotazování na starší snímek tabulky.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Výsledky v:

ID
0
1
4
3
2

Tady vidíte, že jste se vrátili k nejstarší verzi dat.

Zápis streamu dat do tabulky

Do tabulky Delta Lake můžete také zapisovat pomocí strukturovaného streamování Sparku. Transakční protokol Delta Lake zaručuje přesně jedno zpracování, a to i v případě, že jsou v tabulce souběžně spuštěné další streamy nebo dávkové dotazy. Ve výchozím nastavení se streamy spouští v režimu připojení, který do tabulky přidává nové záznamy.

Další informace o integraci Delta Lake se strukturovaným streamováním najdete v tématu Čtení a zápisy streamování tabulek.

V následujících buňkách děláme toto:

  • Buňka 30 : Zobrazení nově připojených dat
  • Buňka 31 – historie kontroly
  • Buňka 32 – Zastavení úlohy strukturovaného streamování
  • Buňka 33 – Zkontrolujte historii <– Všimněte si, že připojení se zastavila

Nejprve nastavíte jednoduchou úlohu streamování Sparku, která vygeneruje sekvenci a zapíše ji do tabulky Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Čtení proudu změn z tabulky

Zatímco stream zapisuje do tabulky Delta Lake, můžete z této tabulky také číst jako zdroj streamování. Můžete například spustit další dotaz streamování, který vytiskne všechny změny provedené v tabulce Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Výsledky v:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Výsledky v:

verze časové razítko operation operationParameters readVersion
5 2020-04-25 00:37:09 AKTUALIZACE STREAMOVÁNÍ [outputMode –> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] 4
4 2020-04-25 00:36:27 SLOUČIT [predikát -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predikát -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predikát -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint)))] 1
1 2020-04-25 00:35:05 ZÁPIS [režim –> Overwrite, partitionBy –> []] 0
0 2020-04-25 00:34:34 ZÁPIS [režim –> ErrorIfExists, partitionBy –> []] null

Tady vypouštíte některé z méně zajímavých sloupců, aby se zjednodušilo prohlížení zobrazení historie.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Výsledky v:

verze časové razítko operation operationParameters readVersion
5 2020-04-25 00:37:09 AKTUALIZACE STREAMOVÁNÍ [outputMode –> Append, queryId –> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId –> 0] 4
4 2020-04-25 00:36:27 SLOUČIT [predikát -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predikát -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predikát -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint)))] 1
1 2020-04-25 00:35:05 ZÁPIS [režim –> Overwrite, partitionBy –> []] 0
0 2020-04-25 00:34:34 ZÁPIS [režim –> ErrorIfExists, partitionBy –> []] null

Převod Parquetu na Delta

Můžete provést místní převod z formátu Parquet na Delta.

Tady otestujete, jestli je existující tabulka v rozdílovém formátu.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Výsledky v:

Ne

Teď převedete data do rozdílového formátu a ověříte, že fungovala.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Výsledky v:

Ano

Podpora SQL

Delta podporuje příkazy tabulkových nástrojů prostřednictvím SQL. SQL můžete použít k:

  • Získání historie deltatable
  • Vysávání tabulky DeltaTable
  • Převod souboru Parquet na Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Výsledky v:

verze časové razítko userId userName operation operationParameters úloha poznámkový blok clusterId readVersion Isolationlevel isBlindAppend
5 2020-04-25 00:37:09 null null AKTUALIZACE STREAMOVÁNÍ [outputMode –> ap... null null null 4 null true
4 2020-04-25 00:36:27 null null SLOUČIT [predikát -> (ol... null null null 3 null false (nepravda)
3 2020-04-25 00:36:08 null null DELETE [predikát -> ["(... null null null 2 null false (nepravda)
2 2020-04-25 00:35:51 null null UPDATE [predikát -> ((i... null null null 1 null false (nepravda)
1 2020-04-25 00:35:05 null null ZÁPIS [režim –> Přepsaný... null null null 0 null false (nepravda)
0 2020-04-25 00:34:34 null null ZÁPIS [režim –> ErrorIfe... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Výsledky v:

program
abfss://data@arca...

Teď ověříte, že tabulka není tabulka s rozdílovým formátem. Potom pomocí Spark SQL převedete tabulku do rozdílového formátu a ověříte, že byla převedena správně.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Výsledky v:

Ano

Úplnou dokumentaci najdete na stránce dokumentace k Delta Lake.

Další informace najdete v tématu Projekt Delta Lake.

Další kroky