Självstudie: Delta Lake
Den här självstudien beskriver vanliga Delta Lake-åtgärder i Azure Databricks, inklusive följande:
- Skapa en tabell.
- Upsert till en tabell.
- Läs från en tabell.
- Visa tabellhistorik.
- Fråga en tidigare version av en tabell.
- Optimera en tabell.
- Lägg till ett Z-orderindex.
- Dammsug oupphörliga filer.
Du kan köra python-, scala- och SQL-exempelkoden i den här artikeln från en notebook-fil som är kopplad till en Azure Databricks-beräkningsresurs, till exempel ett kluster. Du kan också köra SQL-koden i den här artikeln från en fråga som är associerad med ett SQL-lager i Databricks SQL.
Förbereda källdata
Den här självstudien förlitar sig på en datauppsättning med namnet People 10 M. Den innehåller 10 miljoner fiktiva register som innehåller fakta om människor, som för- och efternamn, födelsedatum och lön. Den här självstudien förutsätter att den här datamängden finns i en Unity Catalog-volym som är associerad med din Azure Databricks-målarbetsyta.
Gör följande för att hämta datauppsättningen People 10 M för den här självstudien:
- Gå till sidan Personer 10 M i Kaggle.
- Klicka på Ladda ned för att ladda ned en fil med namnet
archive.zip
till den lokala datorn. - Extrahera filen med namnet
export.csv
frånarchive.zip
filen. Filenexport.csv
innehåller data för den här självstudien.
Gör följande för att ladda upp export.csv
filen till volymen:
- Klicka på Katalog i sidopanelen.
- I Katalogutforskaren bläddrar du till och öppnar volymen där du vill ladda upp
export.csv
filen. - Klicka på Ladda upp till den här volymen.
- Dra och släpp eller bläddra till och välj filen på den
export.csv
lokala datorn. - Klicka på Överför.
I följande kodexempel ersätter du /Volumes/main/default/my-volume/export.csv
med sökvägen till export.csv
filen i målvolymen.
Skapa en tabell
Alla tabeller som skapats i Azure Databricks använder Delta Lake som standard. Databricks rekommenderar att du använder hanterade Unity Catalog-tabeller.
I föregående kodexempel och följande kodexempel ersätter du tabellnamnet main.default.people_10m
med målkatalogen, schemat och tabellnamnet i Unity Catalog.
Kommentar
Delta Lake är standardvärdet för alla kommandon för läsning, skrivningar och tabellskapande i Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Föregående åtgärder skapar en ny hanterad tabell. Information om tillgängliga alternativ när du skapar en Delta-tabell finns i SKAPA TABELL.
I Databricks Runtime 13.3 LTS och senare kan du använda CREATE TABLE LIKE för att skapa en ny tom Delta-tabell som duplicerar schema- och tabellegenskaperna för en Delta-källtabell. Detta kan vara särskilt användbart när du befordrar tabeller från en utvecklingsmiljö till produktion, vilket visas i följande kodexempel:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Om du vill skapa en tom tabell kan du också använda API:et DeltaTableBuilder
i Delta Lake för Python och Scala. Jämfört med motsvarande DataFrameWriter-API:er gör dessa API:er det enklare att ange ytterligare information som kolumnkommenterar, tabellegenskaper och genererade kolumner.
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert till en tabell
Om du vill sammanfoga en uppsättning uppdateringar och infogningar i en befintlig Delta-tabell använder DeltaTable.merge
du metoden för Python och Scala och MERGE INTO-instruktionen för SQL. Följande exempel tar till exempel data från källtabellen och sammanfogar dem till Delta-måltabellen. När det finns en matchande rad i båda tabellerna uppdaterar Delta Lake datakolumnen med det angivna uttrycket. När det inte finns någon matchande rad lägger Delta Lake till en ny rad. Den här åtgärden kallas för en upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Om du anger *
i SQL uppdateras eller infogas alla kolumner i måltabellen, förutsatt att källtabellen har samma kolumner som måltabellen. Om måltabellen inte har samma kolumner utlöser frågan ett analysfel.
Du måste ange ett värde för varje kolumn i tabellen när du utför en infogningsåtgärd (till exempel när det inte finns någon matchande rad i den befintliga datauppsättningen). Du behöver dock inte uppdatera alla värden.
Om du vill se resultatet frågar du tabellen.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Läsa en tabell
Du får åtkomst till data i Delta-tabeller efter tabellnamnet eller tabellsökvägen, enligt följande exempel:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Skriva till en tabell
Delta Lake använder standardsyntax för att skriva data till tabeller.
Om du vill lägga till nya data atomiskt i en befintlig Delta-tabell använder du tilläggsläget enligt följande exempel:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Om du vill ersätta alla data i en tabell använder du överskrivningsläget som i följande exempel:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Uppdatera en tabell
Du kan uppdatera data som matchar ett predikat i en Delta-tabell. Om du till exempel people_10m
vill ändra en förkortning i gender
kolumnen från M
eller F
till Male
eller Female
kan du köra följande:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Ta bort från en tabell
Du kan ta bort data som matchar ett predikat från en Delta-tabell. Om du till exempel people_10m
vill ta bort alla rader som motsvarar personer med ett värde i birthDate
kolumnen från före 1955
kan du köra följande:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Viktigt!
Borttagning tar bort data från den senaste versionen av Delta-tabellen, men tar inte bort dem från den fysiska lagringen förrän de gamla versionerna uttryckligen har dammsugats. Mer information finns i vakuum .
Visa tabellhistorik
Om du vill visa historiken för en tabell använder DeltaTable.history
du metoden för Python och Scala och instruktionen DESCRIBE HISTORY i SQL, som innehåller härkomstinformation, inklusive tabellversion, åtgärd, användare och så vidare, för varje skrivning till en tabell.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Fråga en tidigare version av tabellen (tidsresa)
Med Delta Lake-tidsresor kan du fråga en äldre ögonblicksbild av en Delta-tabell.
Om du vill köra frågor mot en äldre version av en tabell anger du tabellens version eller tidsstämpel. Om du till exempel vill fråga version 0 eller tidsstämpel 2024-05-15T22:43:15.000+00:00Z
från föregående historik använder du följande:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
För tidsstämplar accepteras endast datum- eller tidsstämpelsträngar, till exempel "2024-05-15T22:43:15.000+00:00"
eller "2024-05-15 22:43:15"
.
Med DataFrameReader-alternativ kan du skapa en DataFrame från en Delta-tabell som är fast i en viss version eller tidsstämpel för tabellen, till exempel:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Mer information finns i Arbeta med Delta Lake-tabellhistorik.
Optimera en tabell
När du har gjort flera ändringar i en tabell kan det finnas många små filer. Om du vill förbättra läsfrågornas hastighet kan du använda optimeringsåtgärden för att komprimera små filer till större:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Z-ordning efter kolumner
Om du vill förbättra läsprestanda ytterligare kan du samla in relaterad information i samma uppsättning filer genom z-beställning. Delta Lake-algoritmer för datahopp använder den här sorteringen för att dramatiskt minska mängden data som behöver läsas. Om du vill z-beställa data anger du de kolumner som ska beställas i z-ordning efter åtgärd. Om du till exempel vill samverka efter gender
kör du:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Fullständig uppsättning tillgängliga alternativ när du kör optimeringsåtgärden finns i Optimera datafillayout.
Rensa ögonblicksbilder med VACUUM
Delta Lake tillhandahåller ögonblicksbildisolering för läsningar, vilket innebär att det är säkert att köra en optimeringsåtgärd även när andra användare eller jobb kör frågor mot tabellen. Men så småningom bör du rensa gamla ögonblicksbilder. Du kan göra detta genom att köra vakuumåtgärden:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Mer information om hur du använder vakuumåtgärden finns i Ta bort oanvända datafiler med vakuum.