Delen via


Zelfstudie: Delta Lake

In deze zelfstudie worden algemene Delta Lake-bewerkingen in Azure Databricks geïntroduceerd, waaronder de volgende:

U kunt de voorbeeldcode Python, Scala en SQL in dit artikel uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-rekenresource, zoals een cluster. U kunt de SQL-code in dit artikel ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.

De brongegevens voorbereiden

Deze zelfstudie is afhankelijk van een gegevensset met de naam People 10 M. Het bevat 10 miljoen fictieve records die feiten bevatten over mensen, zoals voor- en achternamen, geboortedatum en salaris. In deze zelfstudie wordt ervan uitgegaan dat deze gegevensset zich in een Unity Catalog-volume bevindt dat is gekoppeld aan uw Azure Databricks-doelwerkruimte.

Ga als volgt te werk om de people 10 M-gegevensset voor deze zelfstudie op te halen:

  1. Ga naar de pagina Personen 10 M in Kaggle.
  2. Klik op Downloaden om een bestand met de naam archive.zip naar uw lokale computer te downloaden.
  3. Pak het bestand uit met de naam export.csv van het archive.zip bestand. Het export.csv bestand bevat de gegevens voor deze zelfstudie.

Ga als volgt te werk om het export.csv bestand naar het volume te uploaden:

  1. Klik in de zijbalk op Catalogus.
  2. Blader in Catalog Explorer naar het volume waar u het bestand wilt uploaden en open hetexport.csv.
  3. Klik op Uploaden naar dit volume.
  4. Sleep en zet het neer, of blader naar en selecteer het export.csv bestand op uw lokale computer.
  5. Klik op Uploaden.

Vervang in de volgende codevoorbeelden /Volumes/main/default/my-volume/export.csv het pad naar het export.csv bestand in het doelvolume.

Een tabel maken

Alle tabellen die in Azure Databricks zijn gemaakt, maken standaard gebruik van Delta Lake. Databricks raadt aan beheerde tabellen van Unity Catalog te gebruiken.

Vervang in het vorige codevoorbeeld en de volgende codevoorbeelden de tabelnaam main.default.people_10m door uw doelcatalogus, schema en tabelnaam in Unity Catalog.

Notitie

Delta Lake is de standaardinstelling voor alle lees-, schrijf- en tabelopmaakopdrachten van Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Met de voorgaande bewerkingen maakt u een nieuwe beheerde tabel. Zie CREATE TABLE voor informatie over beschikbare opties wanneer u een Delta-tabel maakt.

In Databricks Runtime 13.3 LTS en hoger kunt u CREATE TABLE LIKE gebruiken om een nieuwe lege Delta-tabel te maken waarmee de schema- en tabeleigenschappen voor een bron-Delta-tabel worden gedupliceerd. Dit kan met name handig zijn bij het promoveren van tabellen uit een ontwikkelomgeving naar productie, zoals wordt weergegeven in het volgende codevoorbeeld:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Als u een lege tabel wilt maken, kunt u ook de DeltaTableBuilder API in Delta Lake gebruiken voor Python en Scala. Vergeleken met equivalente DataFrameWriter-API's maken deze API's het gemakkelijker om aanvullende informatie op te geven, zoals kolomopmerkingen, tabeleigenschappen en gegenereerde kolommen.

Belangrijk

Deze functie is beschikbaar als openbare preview.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert naar een tabel

Als u een set updates en invoegingen wilt samenvoegen in een bestaande Delta-tabel, gebruikt u de DeltaTable.merge methode voor Python en Scala en de INSTRUCTIE MERGE INTO voor SQL. In het volgende voorbeeld worden bijvoorbeeld gegevens uit de brontabel gebruikt en samengevoegd in de delta-doeltabel. Wanneer er een overeenkomende rij in beide tabellen is, werkt Delta Lake de gegevenskolom bij met behulp van de opgegeven expressie. Wanneer er geen overeenkomende rij is, voegt Delta Lake een nieuwe rij toe. Deze bewerking wordt een upsert genoemd.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Als u in SQL opgeeft *, worden hiermee alle kolommen in de doeltabel bijgewerkt of ingevoegd, ervan uitgaande dat de brontabel dezelfde kolommen heeft als de doeltabel. Als de doeltabel niet dezelfde kolommen heeft, genereert de query een analysefout.

U moet een waarde opgeven voor elke kolom in uw tabel wanneer u een invoegbewerking uitvoert (bijvoorbeeld wanneer er geen overeenkomende rij in de bestaande gegevensset is). U hoeft echter niet alle waarden bij te werken.

Als u de resultaten wilt bekijken, voert u een query uit op de tabel.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Een tabel lezen

U opent gegevens in Delta-tabellen op basis van de tabelnaam of het tabelpad, zoals wordt weergegeven in de volgende voorbeelden:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Schrijven naar een tabel

Delta Lake gebruikt standaardsyntaxis voor het schrijven van gegevens naar tabellen.

Als u nieuwe gegevens atomisch wilt toevoegen aan een bestaande Delta-tabel, gebruikt u de toevoegmodus, zoals wordt weergegeven in de volgende voorbeelden:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Als u alle gegevens in een tabel wilt vervangen, gebruikt u de overschrijfmodus zoals in de volgende voorbeelden:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Een tabel bijwerken

U kunt gegevens bijwerken die overeenkomen met een predicaat in een Delta-tabel. Als u bijvoorbeeld in de voorbeeldtabel people_10m een afkorting in de gender kolom wilt wijzigen van M of F naar Male of Female, kunt u het volgende uitvoeren:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Verwijderen uit een tabel

U kunt gegevens verwijderen die overeenkomen met een predicaat uit een Delta-tabel. Als u bijvoorbeeld in de voorbeeldtabel people_10m alle rijen wilt verwijderen die overeenkomen met personen met een waarde in de birthDate kolom van voordien 1955, kunt u het volgende uitvoeren:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Belangrijk

Verwijdering verwijdert de gegevens uit de nieuwste versie van de Delta-tabel, maar verwijdert deze niet uit de fysieke opslag totdat de oude versies expliciet zijn leeggezogen. Zie vacuüm voor meer informatie.

Tabelgeschiedenis weergeven

Als u de geschiedenis van een tabel wilt bekijken, gebruikt u de DeltaTable.history methode voor Python en Scala en de instructie DESCRIBE HISTORY in SQL, die informatie over de herkomst biedt, waaronder de tabelversie, bewerking, gebruiker enzovoort, voor elke schrijfbewerking naar een tabel.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Een query uitvoeren op een eerdere versie van de tabel (tijdreizen)

Met Delta Lake-tijdreizen kunt u een query uitvoeren op een oudere momentopname van een Delta-tabel.

Als u een query wilt uitvoeren op een oudere versie van een tabel, geeft u de versie of tijdstempel van de tabel op. Als u bijvoorbeeld een query wilt uitvoeren op versie 0 of tijdstempel 2024-05-15T22:43:15.000+00:00Z uit de voorgaande geschiedenis, gebruikt u het volgende:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Voor tijdstempels worden alleen datum- of tijdstempeltekenreeksen geaccepteerd, bijvoorbeeld"2024-05-15T22:43:15.000+00:00"."2024-05-15 22:43:15"

Met DataFrameReader-opties kunt u een DataFrame maken op basis van een Delta-tabel die is vastgezet aan een specifieke versie of tijdstempel van de tabel, bijvoorbeeld:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Zie Werken met Delta Lake-tabelgeschiedenis voor meer informatie.

Een tabel optimaliseren

Nadat u meerdere wijzigingen in een tabel hebt uitgevoerd, hebt u mogelijk veel kleine bestanden. Als u de snelheid van leesquery's wilt verbeteren, kunt u de optimalisatiebewerking gebruiken om kleine bestanden samen te vouwen in grotere bestanden:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Z-volgorde op kolommen

Als u de leesprestaties verder wilt verbeteren, kunt u gerelateerde informatie in dezelfde set bestanden op z-volgorde plaatsen. Delta Lake-algoritmen voor het overslaan van gegevens maken gebruik van deze collocatie om de hoeveelheid gegevens die moet worden gelezen aanzienlijk te verminderen. Voor z-ordergegevens geeft u de kolommen op waarop u in de z-volgorde per bewerking wilt orden. Voer bijvoorbeeld het volgende uit om samen te vermenigvuldigen met gender:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Zie De indeling gegevensbestand optimaliseren voor de volledige set opties die beschikbaar zijn bij het uitvoeren van de optimalisatiebewerking.

Momentopnamen opschonen met VACUUM

Delta Lake biedt isolatie van momentopnamen voor leesbewerkingen, wat betekent dat het veilig is om een optimalisatiebewerking uit te voeren, zelfs terwijl andere gebruikers of taken query's uitvoeren op de tabel. Uiteindelijk moet u echter oude momentopnamen opschonen. U kunt dit doen door de vacuümbewerking uit te voeren:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Zie Ongebruikte gegevensbestanden verwijderen met vacuüm voor meer informatie over het effectief gebruiken van de vacuümbewerking.