Samouczek: usługa Delta Lake
W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:
- Utwórz table.
- upsert do table.
- odczyt z table.
- Wyświetl historię table.
- Wykonaj zapytanie o wcześniejszą wersję table.
- Optimize table.
- Dodawanie indeksu zamówienia Z.
- Vacuum niezareferowane pliki.
Przykładowy kod Python, Scala i SQL można uruchomić z poziomu notesu dołączonego do zasobu obliczeniowego usługi Azure Databricks, takiego jak klaster. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.
Przygotowywanie danych źródłowych
Ten samouczek opiera się na zestawie danych o nazwie People 10 M. Zawiera 10 milionów fikcyjnych zapisów, które zawierają fakty o ludziach, takich jak imię i nazwisko, data urodzenia i wynagrodzenie. W tym samouczku założono, że ten zestaw danych znajduje się w woluminie Unity Catalog, który jest skojarzony z docelowym środowiskiem pracy usługi Azure Databricks.
Aby użyć zestawu danych get dla 10 milionów osób w tym samouczku, wykonaj następujące czynności:
- Przejdź do strony Ludzie 10 M w Kaggle.
- Kliknij przycisk Pobierz , aby pobrać plik o nazwie
archive.zip
na komputerze lokalnym. - Wyodrębnij plik o nazwie
export.csv
zarchive.zip
pliku. Plikexport.csv
zawiera dane na potrzeby tego samouczka.
Aby przekazać export.csv
plik do woluminu, wykonaj następujące czynności:
- Na pasku bocznym kliknij pozycję Catalog.
- W programie Catalog Explorerprzejdź do woluminu where i otwórz go, aby przesłać plik
export.csv
. - Kliknij pozycję Przekaż do tego woluminu.
- Przeciągnij i upuść lub przejdź do i selectpliku
export.csv
na komputerze lokalnym. - Kliknij polecenie Przekaż.
W poniższych przykładach kodu zastąp /Volumes/main/default/my-volume/export.csv
ciąg ścieżką do export.csv
pliku w woluminie docelowym.
Tworzenie table
Wszystkie tables utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake. Usługa Databricks zaleca używanie Catalog zarządzanych tablesaparatu Unity.
W poprzednim przykładzie i poniższych przykładach kodu zastąp nazwę table i main.default.people_10m
docelowym trójskładowym catalog, schemaoraz table w środowisku Unity Catalog.
Uwaga
Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i table tworzenia usługi Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Poprzednie operacje tworzą nowy zarządzany table. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia Delta table, zobacz CREATE TABLE.
W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć CREATE TABLE LIKE, aby utworzyć nową pustą Deltę table, która duplikuje właściwości schema i table źródłowej Delty table. Może to być szczególnie przydatne podczas promowania tables ze środowiska deweloperskiego do środowiska produkcyjnego, jak pokazano w poniższym przykładzie kodu:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Aby utworzyć pusty table, możesz również użyć interfejsu API DeltaTableBuilder
w usłudze Delta Lake dla Python i Scala. W porównaniu z równoważnymi interfejsami API DataFrameWriter, te interfejsy API ułatwiają określanie dodatkowych informacji, takich jak komentarze column, właściwości table i wygenerowane columns.
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
operacja upsert do table
Aby scalić set aktualizacji i wstawiania do istniejącego Delta table, należy użyć metody DeltaTable.merge
dla Python i Scalaoraz instrukcji MERGE INTO dla SQL. Na przykład, poniższy przykład pobiera dane ze źródła table i scala je z celem Delta table. Jeśli w obu tablesistnieje pasujący wiersz, usługa Delta Lake aktualizuje dane column przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
W SQL, jeśli określisz *
, to aktualizuje lub wstawia wszystkie columns do docelowego table, przy założeniu, że źródłowy table ma taką samą columns jak docelowy table. Jeśli docelowy table nie ma tego samego columns, zapytanie zgłasza błąd analizy.
Należy określić wartość dla każdego column w table podczas wykonywania operacji insert (na przykład w przypadku braku pasującego wiersza w istniejącym zestawie danych). Nie trzeba jednak update wszystkich values.
Aby wyświetlić wyniki, wykonaj zapytanie dotyczące table.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Przeczytaj table
Dostęp do danych w usłudze Delta tables można uzyskać według nazwy table lub ścieżki table, jak pokazano w poniższych przykładach:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Napisz do table
Delta Lake używa standardowej składni do zapisywania danych w tables.
Aby atomowo dodać nowe dane do istniejącej Delta table, użyj trybu dołączania danych, jak to pokazano w poniższych przykładach:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Aby zastąpić wszystkie dane w table, użyj trybu zastępowania, jak w następujących przykładach:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Update a table
Możesz update dane, które odpowiadają predykatowi w tabeli Delta table. Na przykład w przykładzie people_10m
table, aby zmienić skrót w gender
column z M
lub F
na Male
lub Female
, można uruchomić następujące polecenie:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Usuń z table
Możesz remove dane zgodne z predykatem z Delty table. Na przykład w przykładzie people_10m
table, aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDate
column przed 1955
, można uruchomić następujące polecenie:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Ważne
Usunięcie usuwa dane z najnowszej wersji table delty, ale nie remove ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Aby uzyskać szczegółowe informacje, zobacz vacuum.
Wyświetlanie historii table
Aby wyświetlić historię table, należy użyć metody DeltaTable.history
dla języka Python i Scalaoraz instrukcji DESCRIBE HISTORY w języku SQL, która zawiera informacje o pochodzeniu, w tym wersję table, operację, użytkownika itd., dla każdego zapisu w table.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Wykonywanie zapytań o starszą wersję table (podróż w czasie)
Funkcja podróży w czasie w Delta Lake umożliwia wykonywanie zapytań do starszej migawki Delta table.
Aby wysłać zapytanie na starszą wersję table, określ wersję tablelub znacznik czasowy. Aby na przykład wysłać zapytanie o wersję 0 lub znacznik 2024-05-15T22:43:15.000+00:00Z
czasu z poprzedniej historii, użyj następującego polecenia:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2024-05-15T22:43:15.000+00:00"
lub "2024-05-15 22:43:15"
.
Opcje elementu DataFrameReader umożliwiają utworzenie DataFrame na podstawie table Delta, która jest przypisana do określonej wersji lub znacznika czasu table, na przykład:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Aby uzyskać szczegółowe informacje, zobacz Praca z Delta Lake table historią.
Optimize a table
Po wykonaniu wielu zmian w table, możesz mieć wiele małych plików. Aby zwiększyć szybkość odczytu zapytań, możesz użyć operacji optimize, aby zwinąć małe pliki na większe:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Porządek Z według columns
Aby jeszcze bardziej zwiększyć wydajność odczytu, można połączyć powiązane informacje w tym samym set plików przez porządkowanie z. Algorytmy pomijania danych usługi Delta Lake używają tej kolokacji, aby znacznie zmniejszyć ilość danych, które należy odczytać. Aby uporządkować dane w porządku z, należy określić columns, według którego ma być przeprowadzona operacja z-porządkowania. Aby na przykład przeprowadzić sortowanie według gender
polecenia , uruchom polecenie:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Aby uzyskać pełną set opcji dostępnych podczas uruchamiania operacji optimize, zobacz Optimize układ pliku danych.
Czyszczenie migawek za pomocą polecenia VACUUM
Usługa Delta Lake zapewnia izolację migawek podczas odczytu, co oznacza, że można bezpiecznie uruchomić operację optimize nawet wtedy, gdy inni użytkownicy lub zadania wykonują zapytania do table. W końcu jednak należy wyczyścić stare migawki. Można to zrobić, uruchamiając operację vacuum:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Aby uzyskać szczegółowe informacje na temat efektywnego wykorzystania operacji vacuum, zapoznaj się z nieużywanymi plikami danych Removevacuum.