Udostępnij za pośrednictwem


Samouczek: usługa Delta Lake

W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:

Przykładowy kod Python, Scala i SQL można uruchomić z poziomu notesu dołączonego do zasobu obliczeniowego usługi Azure Databricks, takiego jak klaster. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.

Przygotowywanie danych źródłowych

Ten samouczek opiera się na zestawie danych o nazwie People 10 M. Zawiera 10 milionów fikcyjnych zapisów, które zawierają fakty o ludziach, takich jak imię i nazwisko, data urodzenia i wynagrodzenie. W tym samouczku założono, że ten zestaw danych znajduje się w woluminie Unity Catalog, który jest skojarzony z docelowym środowiskiem pracy usługi Azure Databricks.

Aby użyć zestawu danych get dla 10 milionów osób w tym samouczku, wykonaj następujące czynności:

  1. Przejdź do strony Ludzie 10 M w Kaggle.
  2. Kliknij przycisk Pobierz , aby pobrać plik o nazwie archive.zip na komputerze lokalnym.
  3. Wyodrębnij plik o nazwie export.csv z archive.zip pliku. Plik export.csv zawiera dane na potrzeby tego samouczka.

Aby przekazać export.csv plik do woluminu, wykonaj następujące czynności:

  1. Na pasku bocznym kliknij pozycję Catalog.
  2. W programie Catalog Explorerprzejdź do woluminu where i otwórz go, aby przesłać plik export.csv.
  3. Kliknij pozycję Przekaż do tego woluminu.
  4. Przeciągnij i upuść lub przejdź do i selectpliku export.csv na komputerze lokalnym.
  5. Kliknij polecenie Przekaż.

W poniższych przykładach kodu zastąp /Volumes/main/default/my-volume/export.csv ciąg ścieżką do export.csv pliku w woluminie docelowym.

Tworzenie table

Wszystkie tables utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake. Usługa Databricks zaleca używanie Catalog zarządzanych tablesaparatu Unity.

W poprzednim przykładzie i poniższych przykładach kodu zastąp nazwę table i main.default.people_10m docelowym trójskładowym catalog, schemaoraz table w środowisku Unity Catalog.

Uwaga

Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i table tworzenia usługi Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Poprzednie operacje tworzą nowy zarządzany table. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia Delta table, zobacz CREATE TABLE.

W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć CREATE TABLE LIKE, aby utworzyć nową pustą Deltę table, która duplikuje właściwości schema i table źródłowej Delty table. Może to być szczególnie przydatne podczas promowania tables ze środowiska deweloperskiego do środowiska produkcyjnego, jak pokazano w poniższym przykładzie kodu:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Aby utworzyć pusty table, możesz również użyć interfejsu API DeltaTableBuilder w usłudze Delta Lake dla Python i Scala. W porównaniu z równoważnymi interfejsami API DataFrameWriter, te interfejsy API ułatwiają określanie dodatkowych informacji, takich jak komentarze column, właściwości table i wygenerowane columns.

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

operacja upsert do table

Aby scalić set aktualizacji i wstawiania do istniejącego Delta table, należy użyć metody DeltaTable.merge dla Python i Scalaoraz instrukcji MERGE INTO dla SQL. Na przykład, poniższy przykład pobiera dane ze źródła table i scala je z celem Delta table. Jeśli w obu tablesistnieje pasujący wiersz, usługa Delta Lake aktualizuje dane column przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

W SQL, jeśli określisz *, to aktualizuje lub wstawia wszystkie columns do docelowego table, przy założeniu, że źródłowy table ma taką samą columns jak docelowy table. Jeśli docelowy table nie ma tego samego columns, zapytanie zgłasza błąd analizy.

Należy określić wartość dla każdego column w table podczas wykonywania operacji insert (na przykład w przypadku braku pasującego wiersza w istniejącym zestawie danych). Nie trzeba jednak update wszystkich values.

Aby wyświetlić wyniki, wykonaj zapytanie dotyczące table.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Przeczytaj table

Dostęp do danych w usłudze Delta tables można uzyskać według nazwy table lub ścieżki table, jak pokazano w poniższych przykładach:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Napisz do table

Delta Lake używa standardowej składni do zapisywania danych w tables.

Aby atomowo dodać nowe dane do istniejącej Delta table, użyj trybu dołączania danych, jak to pokazano w poniższych przykładach:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Aby zastąpić wszystkie dane w table, użyj trybu zastępowania, jak w następujących przykładach:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Update a table

Możesz update dane, które odpowiadają predykatowi w tabeli Delta table. Na przykład w przykładzie people_10mtable, aby zmienić skrót w gendercolumn z M lub F na Male lub Female, można uruchomić następujące polecenie:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Usuń z table

Możesz remove dane zgodne z predykatem z Delty table. Na przykład w przykładzie people_10mtable, aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDatecolumn przed 1955, można uruchomić następujące polecenie:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Ważne

Usunięcie usuwa dane z najnowszej wersji table delty, ale nie remove ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Aby uzyskać szczegółowe informacje, zobacz vacuum.

Wyświetlanie historii table

Aby wyświetlić historię table, należy użyć metody DeltaTable.history dla języka Python i Scalaoraz instrukcji DESCRIBE HISTORY w języku SQL, która zawiera informacje o pochodzeniu, w tym wersję table, operację, użytkownika itd., dla każdego zapisu w table.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Wykonywanie zapytań o starszą wersję table (podróż w czasie)

Funkcja podróży w czasie w Delta Lake umożliwia wykonywanie zapytań do starszej migawki Delta table.

Aby wysłać zapytanie na starszą wersję table, określ wersję tablelub znacznik czasowy. Aby na przykład wysłać zapytanie o wersję 0 lub znacznik 2024-05-15T22:43:15.000+00:00Z czasu z poprzedniej historii, użyj następującego polecenia:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2024-05-15T22:43:15.000+00:00" lub "2024-05-15 22:43:15".

Opcje elementu DataFrameReader umożliwiają utworzenie DataFrame na podstawie table Delta, która jest przypisana do określonej wersji lub znacznika czasu table, na przykład:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Aby uzyskać szczegółowe informacje, zobacz Praca z Delta Lake table historią.

Optimize a table

Po wykonaniu wielu zmian w table, możesz mieć wiele małych plików. Aby zwiększyć szybkość odczytu zapytań, możesz użyć operacji optimize, aby zwinąć małe pliki na większe:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Porządek Z według columns

Aby jeszcze bardziej zwiększyć wydajność odczytu, można połączyć powiązane informacje w tym samym set plików przez porządkowanie z. Algorytmy pomijania danych usługi Delta Lake używają tej kolokacji, aby znacznie zmniejszyć ilość danych, które należy odczytać. Aby uporządkować dane w porządku z, należy określić columns, według którego ma być przeprowadzona operacja z-porządkowania. Aby na przykład przeprowadzić sortowanie według genderpolecenia , uruchom polecenie:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Aby uzyskać pełną set opcji dostępnych podczas uruchamiania operacji optimize, zobacz Optimize układ pliku danych.

Czyszczenie migawek za pomocą polecenia VACUUM

Usługa Delta Lake zapewnia izolację migawek podczas odczytu, co oznacza, że można bezpiecznie uruchomić operację optimize nawet wtedy, gdy inni użytkownicy lub zadania wykonują zapytania do table. W końcu jednak należy wyczyścić stare migawki. Można to zrobić, uruchamiając operację vacuum:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Aby uzyskać szczegółowe informacje na temat efektywnego wykorzystania operacji vacuum, zapoznaj się z nieużywanymi plikami danych Removevacuum.