Kurz: Delta Lake
Tento kurz představuje běžné operace Delta Lake v Azure Databricks, včetně následujících:
- Vytvoření tabulky
- Upsertování do tabulky
- Čtení z tabulky
- Zobrazení historie tabulky
- Dotazování starší verze tabulky
- Optimalizace tabulky
- Přidání indexu Z-pořadí
- Úklid neodkazovaných souborů
Příklad kódu Pythonu, Scaly a SQL v tomto článku můžete spustit z poznámkového bloku připojeného k výpočetnímu prostředku Azure Databricks, jako je cluster. Kód SQL v tomto článku můžete spustit také z dotazu přidruženého k SQL Warehouse v Databricks SQL.
Příprava zdrojových dat
Tento kurz využívá datovou sadu s názvem Lidé 10 M. Obsahuje 10 milionů fiktivních záznamů, které obsahují fakta o lidech, jako jsou křestní jména a příjmení, datum narození a plat. Tento kurz předpokládá, že tato datová sada je ve svazku katalogu Unity, který je přidružený k cílovému pracovnímu prostoru Azure Databricks.
Pokud chcete získat datovou sadu People 10 M pro tento kurz, postupujte takto:
- Přejděte na stránku Lidé 10 M v Kaggle.
- Kliknutím na tlačítko Stáhnout stáhnete soubor s názvem
archive.zip
na místní počítač. - Extrahujte soubor s názvem
export.csv
zearchive.zip
souboru. Souborexport.csv
obsahuje data pro tento kurz.
Pokud chcete soubor nahrát export.csv
do svazku, postupujte takto:
- Na bočním panelu klikněte na Katalog.
- V Průzkumníku katalogu přejděte na svazek, do kterého chcete soubor nahrát, a otevřete ho
export.csv
. - Klikněte na Nahrát na tento svazek.
- Přetáhněte soubor na místním počítači nebo ho
export.csv
vyberte a vyberte. - Klikněte na tlačítko Odeslat.
V následujících příkladech kódu nahraďte /Volumes/main/default/my-volume/export.csv
cestou k export.csv
souboru ve vašem cílovém svazku.
Vytvoření tabulky
Všechny tabulky vytvořené v Azure Databricks ve výchozím nastavení používají Delta Lake. Databricks doporučuje používat spravované tabulky Katalogu Unity.
V předchozím příkladu kódu a v následujících příkladech kódu nahraďte název main.default.people_10m
tabulky cílovým třídílným katalogem, schématem a názvem tabulky v katalogu Unity.
Poznámka:
Delta Lake je výchozí nastavení pro všechny příkazy pro čtení, zápisy a vytváření tabulek v Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Předchozí operace vytvoří novou spravovanou tabulku. Informace o dostupných možnostech při vytváření tabulky Delta najdete v tématu CREATE TABLE.
Ve verzi Databricks Runtime 13.3 LTS a vyšší můžete pomocí příkazu CREATE TABLE LIKE vytvořit novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky pro zdrojovou tabulku Delta. To může být užitečné zejména při propagaci tabulek z vývojového prostředí do produkčního prostředí, jak je znázorněno v následujícím příkladu kódu:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
K vytvoření prázdné tabulky můžete také použít DeltaTableBuilder
rozhraní API v Delta Lake pro Python a Scala. V porovnání s ekvivalentními rozhraními API DataFrameWriter tato rozhraní API usnadňují zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.
Důležité
Tato funkce je ve verzi Public Preview.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Přenesení do tabulky
Ke sloučení sady aktualizací a vložení do existující tabulky Delta použijte metodu DeltaTable.merge
pro Python a Scala a příkaz MERGE INTO pro SQL. Následující příklad například přebírá data ze zdrojové tabulky a slučuje je do cílové tabulky Delta. Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek. Tato operace se označuje jako upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Pokud v SQL zadáte *
, tato aktualizace nebo vloží všechny sloupce v cílové tabulce za předpokladu, že zdrojová tabulka má stejné sloupce jako cílová tabulka. Pokud cílová tabulka nemá stejné sloupce, dotaz vyvolá chybu analýzy.
Při provádění operace vložení musíte zadat hodnotu pro každý sloupec v tabulce (například pokud v existující datové sadě neexistuje odpovídající řádek). Nemusíte ale aktualizovat všechny hodnoty.
Pokud chcete zobrazit výsledky, zadejte dotaz na tabulku.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Čtení tabulky
K datům v tabulkách Delta přistupujete podle názvu tabulky nebo cesty k tabulce, jak je znázorněno v následujících příkladech:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Zápis do tabulky
Delta Lake používá pro zápis dat do tabulek standardní syntaxi.
Pokud chcete do existující tabulky Delta přidat nová data atomicky, použijte režim připojení, jak je znázorněno v následujících příkladech:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Pokud chcete nahradit všechna data v tabulce, použijte režim přepsání jako v následujících příkladech:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Aktualizace tabulky
Data, která odpovídají predikátu v tabulce Delta, můžete aktualizovat. Například v ukázkové people_10m
tabulce můžete změnit zkratku gender
ve sloupci z M
nebo F
na Male
nebo Female
, můžete spustit následující:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Odstranění z tabulky
Data, která odpovídají predikátu, můžete odebrat z tabulky Delta. Například v ukázkové people_10m
tabulce můžete odstranit všechny řádky odpovídající lidem s hodnotou ve birthDate
sloupci před 1955
spuštěním následujícího příkazu:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Důležité
Odstranění odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Podrobnosti najdete v vakuu .
Zobrazení historie tabulek
Pokud chcete zobrazit historii tabulky, použijete metodu DeltaTable.history
pro Python a Scala a příkaz DESCRIBE HISTORY v SQL, který poskytuje informace o původu, včetně verze tabulky, operace, uživatele atd., pro každý zápis do tabulky.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Dotazování na starší verzi tabulky (časová cesta)
Časová cesta Delta Lake umožňuje dotazovat starší snímek tabulky Delta.
Pokud chcete zadat dotaz na starší verzi tabulky, zadejte její verzi nebo časové razítko. Pokud chcete například zadat dotaz na verzi 0 nebo časové razítko 2024-05-15T22:43:15.000+00:00Z
z předchozí historie, použijte následující:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
U časových razítek se přijímají pouze řetězce data nebo časového razítka, "2024-05-15T22:43:15.000+00:00"
například nebo "2024-05-15 22:43:15"
.
Možnosti třídy DataFrameReader umožňují vytvořit datový rámec z tabulky Delta, která je pevná pro konkrétní verzi nebo časové razítko tabulky, například:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Podrobnosti najdete v tématu Práce s historií tabulek Delta Lake.
Optimalizace tabulky
Po provedení několika změn v tabulce můžete mít velké množství malých souborů. Pokud chcete zvýšit rychlost dotazů na čtení, můžete pomocí operace optimalizace sbalit malé soubory do větších:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Pořadí vykreslování podle sloupců
Pokud chcete zvýšit výkon čtení, můžete sloučit související informace ve stejné sadě souborů řazením z. Algoritmy pro přeskočení dat Delta Lake používají tuto kolaci k výraznému snížení množství dat, která je potřeba číst. Chcete-li data pořadí vykreslování, zadáte sloupce, podle kterých se má pořadí podle operace provést. Pokud chcete například sloučit bajt, gender
spusťte:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Úplnou sadu možností dostupných při spuštění operace optimalizace najdete v tématu Optimalizace rozložení datového souboru.
Vyčištění snímků pomocí VACUUM
Delta Lake poskytuje izolaci snímků pro čtení, což znamená, že je bezpečné spustit operaci optimalizace i v případě, že se na tabulku dotazují jiní uživatelé nebo úlohy. Nakonec byste ale měli staré snímky vyčistit. Můžete to provést spuštěním operace vakua:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Podrobnosti o efektivním používání operace vakua naleznete v tématu Odebrání nepoužívaných datových souborů s vakuem.