Dela via


Självstudie: Delta Lake

Den här självstudien beskriver vanliga Delta Lake-åtgärder i Azure Databricks, inklusive följande:

Du kan köra python-, scala- och SQL-exempelkoden i den här artikeln från en notebook-fil som är kopplad till en Azure Databricks-beräkningsresurs, till exempel ett kluster. Du kan också köra SQL-koden i den här artikeln från en fråga som är associerad med ett SQL-lager i Databricks SQL.

Förbereda källdata

Den här självstudien förlitar sig på en datauppsättning med namnet People 10 M. Den innehåller 10 miljoner fiktiva register som innehåller fakta om människor, som för- och efternamn, födelsedatum och lön. Den här självstudien förutsätter att den här datamängden finns i en Unity Catalog-volym som är associerad med din Azure Databricks-målarbetsyta.

Gör följande för att hämta datauppsättningen People 10 M för den här självstudien:

  1. Gå till sidan Personer 10 M i Kaggle.
  2. Klicka på Ladda ned för att ladda ned en fil med namnet archive.zip till den lokala datorn.
  3. Extrahera filen med namnet export.csv från archive.zip filen. Filen export.csv innehåller data för den här självstudien.

Gör följande för att ladda upp export.csv filen till volymen:

  1. Klicka på Katalog i sidopanelen.
  2. I Katalogutforskaren bläddrar du till och öppnar volymen där du vill ladda upp export.csv filen.
  3. Klicka på Ladda upp till den här volymen.
  4. Dra och släpp eller bläddra till och välj filen på den export.csv lokala datorn.
  5. Klicka på Överför.

I följande kodexempel ersätter du /Volumes/main/default/my-volume/export.csv med sökvägen till export.csv filen i målvolymen.

Skapa en tabell

Alla tabeller som skapats i Azure Databricks använder Delta Lake som standard. Databricks rekommenderar att du använder hanterade Unity Catalog-tabeller.

I föregående kodexempel och följande kodexempel ersätter du tabellnamnet main.default.people_10m med målkatalogen, schemat och tabellnamnet i Unity Catalog.

Kommentar

Delta Lake är standardvärdet för alla kommandon för läsning, skrivningar och tabellskapande i Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Föregående åtgärder skapar en ny hanterad tabell. Information om tillgängliga alternativ när du skapar en Delta-tabell finns i SKAPA TABELL.

I Databricks Runtime 13.3 LTS och senare kan du använda CREATE TABLE LIKE för att skapa en ny tom Delta-tabell som duplicerar schema- och tabellegenskaperna för en Delta-källtabell. Detta kan vara särskilt användbart när du befordrar tabeller från en utvecklingsmiljö till produktion, vilket visas i följande kodexempel:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Om du vill skapa en tom tabell kan du också använda API:et DeltaTableBuilder i Delta Lake för Python och Scala. Jämfört med motsvarande DataFrameWriter-API:er gör dessa API:er det enklare att ange ytterligare information som kolumnkommenterar, tabellegenskaper och genererade kolumner.

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert till en tabell

Om du vill sammanfoga en uppsättning uppdateringar och infogningar i en befintlig Delta-tabell använder DeltaTable.merge du metoden för Python och Scala och MERGE INTO-instruktionen för SQL. Följande exempel tar till exempel data från källtabellen och sammanfogar dem till Delta-måltabellen. När det finns en matchande rad i båda tabellerna uppdaterar Delta Lake datakolumnen med det angivna uttrycket. När det inte finns någon matchande rad lägger Delta Lake till en ny rad. Den här åtgärden kallas för en upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Om du anger *i SQL uppdateras eller infogas alla kolumner i måltabellen, förutsatt att källtabellen har samma kolumner som måltabellen. Om måltabellen inte har samma kolumner utlöser frågan ett analysfel.

Du måste ange ett värde för varje kolumn i tabellen när du utför en infogningsåtgärd (till exempel när det inte finns någon matchande rad i den befintliga datauppsättningen). Du behöver dock inte uppdatera alla värden.

Om du vill se resultatet frågar du tabellen.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Läsa en tabell

Du får åtkomst till data i Delta-tabeller efter tabellnamnet eller tabellsökvägen, enligt följande exempel:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Skriva till en tabell

Delta Lake använder standardsyntax för att skriva data till tabeller.

Om du vill lägga till nya data atomiskt i en befintlig Delta-tabell använder du tilläggsläget enligt följande exempel:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Om du vill ersätta alla data i en tabell använder du överskrivningsläget som i följande exempel:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Uppdatera en tabell

Du kan uppdatera data som matchar ett predikat i en Delta-tabell. Om du till exempel people_10m vill ändra en förkortning i gender kolumnen från M eller F till Male eller Femalekan du köra följande:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Ta bort från en tabell

Du kan ta bort data som matchar ett predikat från en Delta-tabell. Om du till exempel people_10m vill ta bort alla rader som motsvarar personer med ett värde i birthDate kolumnen från före 1955kan du köra följande:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Viktigt!

Borttagning tar bort data från den senaste versionen av Delta-tabellen, men tar inte bort dem från den fysiska lagringen förrän de gamla versionerna uttryckligen har dammsugats. Mer information finns i vakuum .

Visa tabellhistorik

Om du vill visa historiken för en tabell använder DeltaTable.history du metoden för Python och Scala och instruktionen DESCRIBE HISTORY i SQL, som innehåller härkomstinformation, inklusive tabellversion, åtgärd, användare och så vidare, för varje skrivning till en tabell.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Fråga en tidigare version av tabellen (tidsresa)

Med Delta Lake-tidsresor kan du fråga en äldre ögonblicksbild av en Delta-tabell.

Om du vill köra frågor mot en äldre version av en tabell anger du tabellens version eller tidsstämpel. Om du till exempel vill fråga version 0 eller tidsstämpel 2024-05-15T22:43:15.000+00:00Z från föregående historik använder du följande:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

För tidsstämplar accepteras endast datum- eller tidsstämpelsträngar, till exempel "2024-05-15T22:43:15.000+00:00" eller "2024-05-15 22:43:15".

Med DataFrameReader-alternativ kan du skapa en DataFrame från en Delta-tabell som är fast i en viss version eller tidsstämpel för tabellen, till exempel:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Mer information finns i Arbeta med Delta Lake-tabellhistorik.

Optimera en tabell

När du har gjort flera ändringar i en tabell kan det finnas många små filer. Om du vill förbättra läsfrågornas hastighet kan du använda optimeringsåtgärden för att komprimera små filer till större:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Z-ordning efter kolumner

Om du vill förbättra läsprestanda ytterligare kan du samla in relaterad information i samma uppsättning filer genom z-beställning. Delta Lake-algoritmer för datahopp använder den här sorteringen för att dramatiskt minska mängden data som behöver läsas. Om du vill z-beställa data anger du de kolumner som ska beställas i z-ordning efter åtgärd. Om du till exempel vill samverka efter genderkör du:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Fullständig uppsättning tillgängliga alternativ när du kör optimeringsåtgärden finns i Optimera datafillayout.

Rensa ögonblicksbilder med VACUUM

Delta Lake tillhandahåller ögonblicksbildisolering för läsningar, vilket innebär att det är säkert att köra en optimeringsåtgärd även när andra användare eller jobb kör frågor mot tabellen. Men så småningom bör du rensa gamla ögonblicksbilder. Du kan göra detta genom att köra vakuumåtgärden:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Mer information om hur du använder vakuumåtgärden finns i Ta bort oanvända datafiler med vakuum.