Samouczek: usługa Delta Lake
W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:
- Tworzenie tabeli.
- Wykonanie operacji upsert do tabeli.
- Odczyt z tabeli.
- Wyświetlanie historii tabeli.
- Wykonywanie zapytań względem wcześniejszej wersji tabeli.
- Optymalizowanie tabeli.
- Dodawanie indeksu zamówienia Z.
- Opróżnianie plików bez odniesienia.
Przykładowy kod Python, Scala i SQL można uruchomić z poziomu notesu dołączonego do zasobu obliczeniowego usługi Azure Databricks, takiego jak klaster. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.
Przygotowywanie danych źródłowych
Ten samouczek opiera się na zestawie danych o nazwie People 10 M. Zawiera 10 milionów fikcyjnych zapisów, które zawierają fakty o ludziach, takich jak imię i nazwisko, data urodzenia i wynagrodzenie. W tym samouczku założono, że ten zestaw danych znajduje się w woluminie wykazu aparatu Unity skojarzonym z docelowym obszarem roboczym usługi Azure Databricks.
Aby uzyskać zestaw danych Osób 10 M dla tego samouczka, wykonaj następujące czynności:
- Przejdź do strony Ludzie 10 M w Kaggle.
- Kliknij przycisk Pobierz , aby pobrać plik o nazwie
archive.zip
na komputerze lokalnym. - Wyodrębnij plik o nazwie
export.csv
zarchive.zip
pliku. Plikexport.csv
zawiera dane na potrzeby tego samouczka.
Aby przekazać export.csv
plik do woluminu, wykonaj następujące czynności:
- Na pasku bocznym kliknij pozycję Wykaz.
- W Eksploratorze wykazu przejdź do i otwórz wolumin, w którym chcesz przekazać
export.csv
plik. - Kliknij pozycję Przekaż do tego woluminu.
- Przeciągnij i upuść lub przejdź do i wybierz
export.csv
plik na komputerze lokalnym. - Kliknij polecenie Przekaż.
W poniższych przykładach kodu zastąp /Volumes/main/default/my-volume/export.csv
ciąg ścieżką do export.csv
pliku w woluminie docelowym.
Tworzenie tabeli
Wszystkie tabele utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake. Usługa Databricks zaleca używanie tabel zarządzanych w wykazie aparatu Unity.
W poprzednim przykładzie kodu i poniższych przykładach kodu zastąp nazwę tabeli docelowym main.default.people_10m
katalogiem trzyczęściowym, schematem i nazwą tabeli w wykazie aparatu Unity.
Uwaga
Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i tworzenia tabel w usłudze Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Poprzednie operacje tworzą nową zarządzaną tabelę. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia tabeli delty, zobacz CREATE TABLE (TWORZENIE TABELI).
W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć polecenia CREATE TABLE LIKE , aby utworzyć nową pustą tabelę delty, która duplikuje właściwości schematu i tabeli źródłowej tabeli delty. Może to być szczególnie przydatne podczas promowania tabel ze środowiska deweloperskiego do środowiska produkcyjnego, jak pokazano w poniższym przykładzie kodu:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Aby utworzyć pustą tabelę, możesz również użyć interfejsu DeltaTableBuilder
API w usłudze Delta Lake dla języków Python i Scala. W porównaniu z równoważnymi interfejsami API elementu DataFrameWriter te interfejsy API ułatwiają określanie dodatkowych informacji, takich jak komentarze kolumn, właściwości tabeli i wygenerowane kolumny.
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert do tabeli
Aby scalić zestaw aktualizacji i wstawiania do istniejącej tabeli delty, należy użyć DeltaTable.merge
metody dla języków Python i Scala oraz instrukcji MERGE INTO dla języka SQL. Na przykład poniższy przykład pobiera dane z tabeli źródłowej i scala je z docelową tabelą delty. Jeśli w obu tabelach istnieje pasujący wiersz, usługa Delta Lake aktualizuje kolumnę danych przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
W programie SQL, jeśli określisz *
wartość , spowoduje to zaktualizowanie lub wstawienie wszystkich kolumn w tabeli docelowej przy założeniu, że tabela źródłowa ma te same kolumny co tabela docelowa. Jeśli tabela docelowa nie zawiera tych samych kolumn, zapytanie zgłasza błąd analizy.
Należy określić wartość dla każdej kolumny w tabeli podczas wykonywania operacji wstawiania (na przykład, gdy w istniejącym zestawie danych nie ma pasującego wiersza). Nie trzeba jednak aktualizować wszystkich wartości.
Aby wyświetlić wyniki, wykonaj zapytanie względem tabeli.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Odczytywanie tabeli
Dostęp do danych w tabelach delty można uzyskać według nazwy tabeli lub ścieżki tabeli, jak pokazano w następujących przykładach:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Zapisywanie w tabeli
Usługa Delta Lake używa standardowej składni do zapisywania danych w tabelach.
Aby niepodziecznie dodać nowe dane do istniejącej tabeli delty, użyj trybu dołączania, jak pokazano w poniższych przykładach:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Aby zastąpić wszystkie dane w tabeli, użyj trybu zastępowania, jak w następujących przykładach:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Aktualizowanie tabeli
Możesz zaktualizować dane zgodne z predykatem w tabeli delty. Na przykład w przykładowej people_10m
tabeli, aby zmienić skrót w gender
kolumnie z M
lub na Male
lub F
Female
, można uruchomić następujące polecenie:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Usuwanie z tabeli
Możesz usunąć dane zgodne z predykatem z tabeli delty. Na przykład w przykładowej people_10m
tabeli, aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDate
kolumnie sprzed 1955
, można uruchomić następujące polecenie:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Ważne
Usunięcie usuwa dane z najnowszej wersji tabeli delty, ale nie usuwa ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Zobacz czyszczenie , aby uzyskać szczegółowe informacje.
Wyświetlanie historii tabeli
Aby wyświetlić historię tabeli, należy użyć DeltaTable.history
metody dla języków Python i Scala oraz instrukcji DESCRIBE HISTORY w języku SQL, która zawiera informacje o pochodzenia, w tym wersję tabeli, operację, użytkownika i tak dalej, dla każdego zapisu w tabeli.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Wykonywanie zapytań względem wcześniejszej wersji tabeli (podróż czasowa)
Podróż w czasie usługi Delta Lake umożliwia wykonywanie zapytań dotyczących starszej migawki tabeli delty.
Aby wysłać zapytanie do starszej wersji tabeli, określ wersję lub sygnaturę czasową tabeli. Aby na przykład wysłać zapytanie o wersję 0 lub znacznik 2024-05-15T22:43:15.000+00:00Z
czasu z poprzedniej historii, użyj następującego polecenia:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2024-05-15T22:43:15.000+00:00"
lub "2024-05-15 22:43:15"
.
Opcje elementu DataFrameReader umożliwiają utworzenie ramki danych na podstawie tabeli delty, która jest stała do określonej wersji lub znacznika czasu tabeli, na przykład:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Aby uzyskać szczegółowe informacje, zobacz Praca z historią tabel usługi Delta Lake.
Optymalizowanie tabeli
Po wykonaniu wielu zmian w tabeli może istnieć wiele małych plików. Aby zwiększyć szybkość zapytań odczytu, możesz użyć operacji optymalizacji, aby zwinąć małe pliki na większe:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Kolejność Z według kolumn
Aby jeszcze bardziej zwiększyć wydajność odczytu, można połączyć powiązane informacje w tym samym zestawie plików przez porządkowanie z. Algorytmy pomijania danych usługi Delta Lake używają tej kolokacji, aby znacznie zmniejszyć ilość danych, które należy odczytać. Aby uzyskać dane kolejności z, należy określić kolumny do kolejności w kolejności z według operacji. Aby na przykład przeprowadzić sortowanie według gender
polecenia , uruchom polecenie:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Aby uzyskać pełny zestaw opcji dostępnych podczas uruchamiania operacji optymalizacji, zobacz Optymalizowanie układu pliku danych.
Czyszczenie migawek za pomocą polecenia VACUUM
Usługa Delta Lake zapewnia izolację migawek dla operacji odczytu, co oznacza, że można bezpiecznie uruchomić operację optymalizacji nawet wtedy, gdy inni użytkownicy lub zadania wysyłają zapytania do tabeli. W końcu jednak należy wyczyścić stare migawki. Można to zrobić, uruchamiając operację próżniową:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Aby uzyskać szczegółowe informacje na temat efektywnego korzystania z operacji czyszczenia, zobacz Usuwanie nieużywanych plików danych z próżnią.