Omówienie usługi Delta Lake programu Linux Foundation
Ten artykuł został dostosowany do większej jasności od oryginalnego odpowiednika tutaj. Ten artykuł ułatwia szybkie zapoznanie się z głównymi funkcjami usługi Delta Lake. Artykuł zawiera fragmenty kodu, które pokazują, jak odczytywać i zapisywać w tabelach usługi Delta Lake z interakcyjnych, wsadowych i przesyłanych strumieniowo zapytań. Fragmenty kodu są również dostępne w zestawie notesów PySpark tutaj, Scala tutaj i C# tutaj
Oto, co omówimy:
- Tworzenie tabeli
- Odczyt danych
- Aktualizowanie danych tabeli
- Zastępowanie danych tabeli
- Aktualizacja warunkowa bez zastępowania
- Odczytywanie starszych wersji danych przy użyciu funkcji Time Travel
- Zapisywanie strumienia danych w tabeli
- Odczytywanie strumienia zmian z tabeli
- Obsługa języka SQL
Konfigurowanie
Upewnij się, że zmodyfikujesz poniższe zależnie od środowiska.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Wyniki:
"/delta/delta-table-335323"
Tworzenie tabeli
Aby utworzyć tabelę usługi Delta Lake, napisz ramkę danych w formacie różnicowym. Możesz zmienić format z Parquet, CSV, JSON itd. na różnicę.
Poniższy kod pokazuje, jak utworzyć nową tabelę usługi Delta Lake przy użyciu schematu wnioskowanego z ramki danych.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Wyniki:
ID (Identyfikator) |
---|
0 |
1 |
2 |
3 |
4 |
Odczyt danych
Dane są odczytywane w tabeli usługi Delta Lake, określając ścieżkę do plików i format różnicowy.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Wyniki:
ID (Identyfikator) |
---|
1 |
3 |
4 |
0 |
2 |
Kolejność wyników różni się od powyższego, ponieważ nie określono jawnie kolejności przed wyświetleniem wyników.
Aktualizowanie danych tabeli
Usługa Delta Lake obsługuje kilka operacji modyfikowania tabel przy użyciu standardowych interfejsów API ramki danych. Te operacje są jednym z ulepszeń, które dodaje format różnicowy. Poniższy przykład uruchamia zadanie wsadowe, aby zastąpić dane w tabeli.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Wyniki:
ID (Identyfikator) |
---|
7 |
8 |
5 |
9 |
6 |
W tym miejscu widać, że wszystkie pięć rekordów zostało zaktualizowanych do przechowywania nowych wartości.
Zapisz jako tabele wykazu
Usługa Delta Lake może zapisywać w tabelach zarządzanych lub zewnętrznych wykazów.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Wyniki:
database | tableName | Istemporary |
---|---|---|
default | externaldeltatable | fałsz |
default | manageddeltatable | fałsz |
W tym kodzie utworzono nową tabelę w wykazie z istniejącej ramki danych, nazywanej tabelą zarządzaną. Następnie zdefiniowano nową tabelę zewnętrzną w katalogu, która używa istniejącej lokalizacji, nazywanej tabelą zewnętrzną. W danych wyjściowych można zobaczyć obie tabele, bez względu na sposób ich tworzenia, są wyświetlane w wykazie.
Teraz możesz przyjrzeć się rozszerzonym właściwościom obu tych tabel
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Wyniki:
col_name | Data_type | komentarz |
---|---|---|
identyfikator | bigint | null |
Szczegółowe informacje o tabeli | ||
baza danych | default | |
Tabela | manageddeltatable | |
Właściciel | zaufany użytkownik usługi | |
Czas utworzenia | Sat Apr 25 00:35:34 UTC 2020 | |
Ostatni dostęp | Czw 01 stycznia 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | ZARZĄDZANE | |
Dostawca | delta | |
Właściwości tabeli | [transient_lastDdlTime=1587774934] | |
Statystyki | 2407 bajtów | |
Lokalizacja | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< nazwa> przestrzeni pracy/magazyn/manageddeltatable | |
Biblioteka Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Właściwości magazynu | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Wyniki:
col_name | Data_type | komentarz |
---|---|---|
identyfikator | bigint | null |
Szczegółowe informacje o tabeli | ||
baza danych | default | |
Tabela | externaldeltatable | |
Właściciel | zaufany użytkownik usługi | |
Czas utworzenia | Sat Apr 25 00:35:38 UTC 2020 | |
Ostatni dostęp | Czw 01 stycznia 00:00:00 UTC 1970 | |
Created By | Spark 2.4.4.2.6.99.201-11401300 | |
Typ | ZEWNĘTRZNYCH | |
Dostawca | DELTA | |
Właściwości tabeli | [transient_lastDdlTime=1587774938] | |
Lokalizacja | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Biblioteka Serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Właściwości magazynu | [serialization.format=1] |
Aktualizacja warunkowa bez zastępowania
Usługa Delta Lake udostępnia programowe interfejsy API do aktualizacji warunkowej, usuwania i scalania (to polecenie jest często nazywane danymi upsert) w tabelach.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Wyniki:
ID (Identyfikator) |
---|
106 |
108 |
5 |
7 |
9 |
W tym miejscu właśnie dodano 100 do każdego identyfikatora parzystego.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Wyniki:
ID (Identyfikator) |
---|
5 |
7 |
9 |
Zwróć uwagę, że każdy wiersz parzystowy został usunięty.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Wyniki:
ID (Identyfikator) |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
W tym miejscu masz kombinację istniejących danych. Istniejące dane zostały przypisane do wartości -1 w ścieżce kodu update(WhenMatched). Dodano również nowe dane utworzone w górnej części fragmentu kodu i zostały dodane za pośrednictwem ścieżki kodu wstawiania (WhenNotMatched).
Historia
Usługa Delta Lake ma możliwość wglądu w historię tabeli. Oznacza to, że zmiany wprowadzone w podstawowej tabeli delty. W poniższej komórce pokazano, jak proste jest sprawdzenie historii.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Wyniki:
Wersja | sygnatura czasowa | userId | userName | operation | operationParameters | zadanie | notes | clusterId | readVersion | Isolationlevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | null | null | SCALANIA | [predykat -> (oldData.ID = newData.ID )] |
null | null | null | 3 | null | fałsz |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predykat -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
null | null | null | 2 | null | fałsz |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predykat -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | null | null | null | 1 | null | fałsz |
1 | 2020-04-25 00:35:05 | null | null | NAPISZ | [tryb —> Zastąp, partitionBy —> []] | null | null | null | 0 | null | fałsz |
0 | 2020-04-25 00:34:34 | null | null | NAPISZ | [tryb —> ErrorIfExists, partitionBy —> []] | null | null | null | null | null | true |
W tym miejscu można zobaczyć wszystkie modyfikacje wprowadzone w powyższych fragmentach kodu.
Odczytywanie starszych wersji danych przy użyciu funkcji Time Travel
Istnieje możliwość wykonywania zapytań względem poprzednich migawek tabeli usługi Delta Lake przy użyciu funkcji o nazwie Time Travel. Jeśli chcesz uzyskać dostęp do danych, które zastąpisz, możesz wykonać zapytanie dotyczące migawki tabeli przed zastąpieniem pierwszego zestawu danych przy użyciu opcji versionAsOf.
Po uruchomieniu poniższej komórki przed zastąpieniem powinien zostać wyświetlony pierwszy zestaw danych. Time Travel to zaawansowana funkcja, która wykorzystuje możliwości dziennika transakcji usługi Delta Lake w celu uzyskania dostępu do danych, które nie są już w tabeli. Usunięcie opcji w wersji 0 (lub określenie wersji 1) umożliwi ponowne wyświetlanie nowszych danych. Aby uzyskać więcej informacji, zobacz Wykonywanie zapytań o starszą migawkę tabeli.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Wyniki:
ID (Identyfikator) |
---|
0 |
1 |
4 |
3 |
2 |
W tym miejscu widać, że wrócisz do najwcześniejszej wersji danych.
Zapisywanie strumienia danych w tabeli
Możesz również napisać do tabeli usługi Delta Lake przy użyciu przesyłania strumieniowego ze strukturą platformy Spark. Dziennik transakcji usługi Delta Lake gwarantuje dokładnie jednokrotne przetwarzanie, nawet jeśli istnieją inne strumienie lub zapytania wsadowe uruchomione współbieżnie względem tabeli. Domyślnie strumienie są uruchamiane w trybie dołączania, który dodaje nowe rekordy do tabeli.
Aby uzyskać więcej informacji na temat integracji usługi Delta Lake z przesyłaniem strumieniowym ze strukturą, zobacz Odczyty i zapisy przesyłane strumieniowo tabel.
W poniższych komórkach oto, co robimy:
- Komórka 30 Pokaż nowo dołączone dane
- Komórka 31 — Inspekcja historii
- Komórka 32 Zatrzymaj ustrukturyzowane zadanie przesyłania strumieniowego
- Komórka 33 Inspekcja historii <— zauważysz, że dołączenia zostały zatrzymane
Najpierw skonfigurujesz proste zadanie przesyłania strumieniowego platformy Spark w celu wygenerowania sekwencji i zapisania zadania do tabeli delty.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Odczytywanie strumienia zmian z tabeli
Podczas zapisywania strumienia w tabeli usługi Delta Lake można również odczytać z tej tabeli jako źródło przesyłania strumieniowego. Możesz na przykład uruchomić kolejne zapytanie przesyłania strumieniowego, które wyświetla wszystkie zmiany wprowadzone w tabeli usługi Delta Lake.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Wyniki:
ID (Identyfikator) |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Wyniki:
Wersja | sygnatura czasowa | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO | [outputMode —> Dołącz, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | SCALANIA | [predykat -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predykat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predykat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | NAPISZ | [tryb —> Zastąp, partitionBy —> []] | 0 |
0 | 2020-04-25 00:34:34 | NAPISZ | [tryb —> ErrorIfExists, partitionBy —> []] | null |
W tym miejscu upuszczasz niektóre z mniej interesujących kolumn, aby uprościć wyświetlanie widoku historii.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Wyniki:
Wersja | sygnatura czasowa | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO | [outputMode —> Dołącz, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | SCALANIA | [predykat -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predykat -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predykat -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | NAPISZ | [tryb —> Zastąp, partitionBy —> []] | 0 |
0 | 2020-04-25 00:34:34 | NAPISZ | [tryb —> ErrorIfExists, partitionBy —> []] | null |
Konwertowanie parquet na różnicę
Konwersję w miejscu można wykonać z formatu Parquet na różnicę.
W tym miejscu sprawdzisz, czy istniejąca tabela ma format różnicowy, czy nie.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Wyniki:
Fałsz
Teraz przekonwertujesz dane na format różnicowy i sprawdzisz, czy to zadziałało.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Wyniki:
Prawda
Obsługa języka SQL
Funkcja Delta obsługuje polecenia narzędzia tabel za pomocą języka SQL. Za pomocą programu SQL można wykonywać następujące czynności:
- Uzyskiwanie historii tabeli DeltaTable
- Opróżnij tabelę deltaTable
- Konwertowanie pliku Parquet na różnicę
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Wyniki:
Wersja | sygnatura czasowa | userId | userName | operation | operationParameters | zadanie | notes | clusterId | readVersion | Isolationlevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | null | null | AKTUALIZACJA PRZESYŁANIA STRUMIENIOWEGO | [outputMode —> Ap... | null | null | null | 4 | null | true |
4 | 2020-04-25 00:36:27 | null | null | SCALANIA | [predykat -> (ol... | null | null | null | 3 | null | fałsz |
3 | 2020-04-25 00:36:08 | null | null | DELETE | [predykat -> ["(... | null | null | null | 2 | null | fałsz |
2 | 2020-04-25 00:35:51 | null | null | UPDATE | [predykat -> ((i... | null | null | null | 1 | null | fałsz |
1 | 2020-04-25 00:35:05 | null | null | NAPISZ | [tryb —> Zastępowanie... | null | null | null | 0 | null | fałsz |
0 | 2020-04-25 00:34:34 | null | null | NAPISZ | [tryb —> ErrorIfE... | null | null | null | null | null | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Wyniki:
path |
---|
abfss://data@arca... |
Teraz sprawdzisz, czy tabela nie jest tabelą formatu różnicowego. Następnie przekonwertujesz tabelę na format różnicowy przy użyciu usługi Spark SQL i potwierdzisz, że została ona poprawnie przekonwertowana.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Wyniki:
Prawda
Aby uzyskać pełną dokumentację, zobacz stronę dokumentacji usługi Delta Lake
Aby uzyskać więcej informacji, zobacz Projekt usługi Delta Lake.