Zelfstudie: Delta Lake
In deze zelfstudie worden algemene Delta Lake-bewerkingen in Azure Databricks geïntroduceerd, waaronder de volgende:
- Een tabel maken.
- Upsert naar een tabel.
- Lezen uit een tabel.
- Tabelgeschiedenis weergeven.
- Een query uitvoeren op een eerdere versie van een tabel.
- Een tabel optimaliseren.
- Een Z-orderindex toevoegen.
- Vacuüm bestanden zonder verwijzing.
U kunt de voorbeeldcode Python, Scala en SQL in dit artikel uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-rekenresource, zoals een cluster. U kunt de SQL-code in dit artikel ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.
De brongegevens voorbereiden
Deze zelfstudie is afhankelijk van een gegevensset met de naam People 10 M. Het bevat 10 miljoen fictieve records die feiten bevatten over mensen, zoals voor- en achternamen, geboortedatum en salaris. In deze zelfstudie wordt ervan uitgegaan dat deze gegevensset zich in een Unity Catalog-volume bevindt dat is gekoppeld aan uw Azure Databricks-doelwerkruimte.
Ga als volgt te werk om de people 10 M-gegevensset voor deze zelfstudie op te halen:
- Ga naar de pagina Personen 10 M in Kaggle.
- Klik op Downloaden om een bestand met de naam
archive.zip
naar uw lokale computer te downloaden. - Pak het bestand uit met de naam
export.csv
van hetarchive.zip
bestand. Hetexport.csv
bestand bevat de gegevens voor deze zelfstudie.
Ga als volgt te werk om het export.csv
bestand naar het volume te uploaden:
- Klik in de zijbalk op Catalogus.
- Blader in Catalog Explorer naar het volume waar u het bestand wilt uploaden en open het
export.csv
. - Klik op Uploaden naar dit volume.
- Sleep en zet het neer, of blader naar en selecteer het
export.csv
bestand op uw lokale computer. - Klik op Uploaden.
Vervang in de volgende codevoorbeelden /Volumes/main/default/my-volume/export.csv
het pad naar het export.csv
bestand in het doelvolume.
Een tabel maken
Alle tabellen die in Azure Databricks zijn gemaakt, maken standaard gebruik van Delta Lake. Databricks raadt aan beheerde tabellen van Unity Catalog te gebruiken.
Vervang in het vorige codevoorbeeld en de volgende codevoorbeelden de tabelnaam main.default.people_10m
door uw doelcatalogus, schema en tabelnaam in Unity Catalog.
Notitie
Delta Lake is de standaardinstelling voor alle lees-, schrijf- en tabelopmaakopdrachten van Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Met de voorgaande bewerkingen maakt u een nieuwe beheerde tabel. Zie CREATE TABLE voor informatie over beschikbare opties wanneer u een Delta-tabel maakt.
In Databricks Runtime 13.3 LTS en hoger kunt u CREATE TABLE LIKE gebruiken om een nieuwe lege Delta-tabel te maken waarmee de schema- en tabeleigenschappen voor een bron-Delta-tabel worden gedupliceerd. Dit kan met name handig zijn bij het promoveren van tabellen uit een ontwikkelomgeving naar productie, zoals wordt weergegeven in het volgende codevoorbeeld:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Als u een lege tabel wilt maken, kunt u ook de DeltaTableBuilder
API in Delta Lake gebruiken voor Python en Scala. Vergeleken met equivalente DataFrameWriter-API's maken deze API's het gemakkelijker om aanvullende informatie op te geven, zoals kolomopmerkingen, tabeleigenschappen en gegenereerde kolommen.
Belangrijk
Deze functie is beschikbaar als openbare preview.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert naar een tabel
Als u een set updates en invoegingen wilt samenvoegen in een bestaande Delta-tabel, gebruikt u de DeltaTable.merge
methode voor Python en Scala en de INSTRUCTIE MERGE INTO voor SQL. In het volgende voorbeeld worden bijvoorbeeld gegevens uit de brontabel gebruikt en samengevoegd in de delta-doeltabel. Wanneer er een overeenkomende rij in beide tabellen is, werkt Delta Lake de gegevenskolom bij met behulp van de opgegeven expressie. Wanneer er geen overeenkomende rij is, voegt Delta Lake een nieuwe rij toe. Deze bewerking wordt een upsert genoemd.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Als u in SQL opgeeft *
, worden hiermee alle kolommen in de doeltabel bijgewerkt of ingevoegd, ervan uitgaande dat de brontabel dezelfde kolommen heeft als de doeltabel. Als de doeltabel niet dezelfde kolommen heeft, genereert de query een analysefout.
U moet een waarde opgeven voor elke kolom in uw tabel wanneer u een invoegbewerking uitvoert (bijvoorbeeld wanneer er geen overeenkomende rij in de bestaande gegevensset is). U hoeft echter niet alle waarden bij te werken.
Als u de resultaten wilt bekijken, voert u een query uit op de tabel.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Een tabel lezen
U opent gegevens in Delta-tabellen op basis van de tabelnaam of het tabelpad, zoals wordt weergegeven in de volgende voorbeelden:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Schrijven naar een tabel
Delta Lake gebruikt standaardsyntaxis voor het schrijven van gegevens naar tabellen.
Als u nieuwe gegevens atomisch wilt toevoegen aan een bestaande Delta-tabel, gebruikt u de toevoegmodus, zoals wordt weergegeven in de volgende voorbeelden:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Als u alle gegevens in een tabel wilt vervangen, gebruikt u de overschrijfmodus zoals in de volgende voorbeelden:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Een tabel bijwerken
U kunt gegevens bijwerken die overeenkomen met een predicaat in een Delta-tabel. Als u bijvoorbeeld in de voorbeeldtabel people_10m
een afkorting in de gender
kolom wilt wijzigen van M
of F
naar Male
of Female
, kunt u het volgende uitvoeren:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Verwijderen uit een tabel
U kunt gegevens verwijderen die overeenkomen met een predicaat uit een Delta-tabel. Als u bijvoorbeeld in de voorbeeldtabel people_10m
alle rijen wilt verwijderen die overeenkomen met personen met een waarde in de birthDate
kolom van voordien 1955
, kunt u het volgende uitvoeren:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Belangrijk
Verwijdering verwijdert de gegevens uit de nieuwste versie van de Delta-tabel, maar verwijdert deze niet uit de fysieke opslag totdat de oude versies expliciet zijn leeggezogen. Zie vacuüm voor meer informatie.
Tabelgeschiedenis weergeven
Als u de geschiedenis van een tabel wilt bekijken, gebruikt u de DeltaTable.history
methode voor Python en Scala en de instructie DESCRIBE HISTORY in SQL, die informatie over de herkomst biedt, waaronder de tabelversie, bewerking, gebruiker enzovoort, voor elke schrijfbewerking naar een tabel.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Een query uitvoeren op een eerdere versie van de tabel (tijdreizen)
Met Delta Lake-tijdreizen kunt u een query uitvoeren op een oudere momentopname van een Delta-tabel.
Als u een query wilt uitvoeren op een oudere versie van een tabel, geeft u de versie of tijdstempel van de tabel op. Als u bijvoorbeeld een query wilt uitvoeren op versie 0 of tijdstempel 2024-05-15T22:43:15.000+00:00Z
uit de voorgaande geschiedenis, gebruikt u het volgende:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Voor tijdstempels worden alleen datum- of tijdstempeltekenreeksen geaccepteerd, bijvoorbeeld"2024-05-15T22:43:15.000+00:00"
."2024-05-15 22:43:15"
Met DataFrameReader-opties kunt u een DataFrame maken op basis van een Delta-tabel die is vastgezet aan een specifieke versie of tijdstempel van de tabel, bijvoorbeeld:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Zie Werken met Delta Lake-tabelgeschiedenis voor meer informatie.
Een tabel optimaliseren
Nadat u meerdere wijzigingen in een tabel hebt uitgevoerd, hebt u mogelijk veel kleine bestanden. Als u de snelheid van leesquery's wilt verbeteren, kunt u de optimalisatiebewerking gebruiken om kleine bestanden samen te vouwen in grotere bestanden:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Z-volgorde op kolommen
Als u de leesprestaties verder wilt verbeteren, kunt u gerelateerde informatie in dezelfde set bestanden op z-volgorde plaatsen. Delta Lake-algoritmen voor het overslaan van gegevens maken gebruik van deze collocatie om de hoeveelheid gegevens die moet worden gelezen aanzienlijk te verminderen. Voor z-ordergegevens geeft u de kolommen op waarop u in de z-volgorde per bewerking wilt orden. Voer bijvoorbeeld het volgende uit om samen te vermenigvuldigen met gender
:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Zie De indeling gegevensbestand optimaliseren voor de volledige set opties die beschikbaar zijn bij het uitvoeren van de optimalisatiebewerking.
Momentopnamen opschonen met VACUUM
Delta Lake biedt isolatie van momentopnamen voor leesbewerkingen, wat betekent dat het veilig is om een optimalisatiebewerking uit te voeren, zelfs terwijl andere gebruikers of taken query's uitvoeren op de tabel. Uiteindelijk moet u echter oude momentopnamen opschonen. U kunt dit doen door de vacuümbewerking uit te voeren:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Zie Ongebruikte gegevensbestanden verwijderen met vacuüm voor meer informatie over het effectief gebruiken van de vacuümbewerking.