Udostępnij za pośrednictwem


Samouczek: usługa Delta Lake

W tym samouczku przedstawiono typowe operacje usługi Delta Lake w usłudze Azure Databricks, w tym następujące elementy:

Przykładowy kod Python, Scala i SQL można uruchomić z poziomu notesu dołączonego do zasobu obliczeniowego usługi Azure Databricks, takiego jak klaster. Możesz również uruchomić kod SQL w tym artykule z poziomu zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.

Przygotowywanie danych źródłowych

Ten samouczek opiera się na zestawie danych o nazwie People 10 M. Zawiera 10 milionów fikcyjnych zapisów, które zawierają fakty o ludziach, takich jak imię i nazwisko, data urodzenia i wynagrodzenie. W tym samouczku założono, że ten zestaw danych znajduje się w woluminie wykazu aparatu Unity skojarzonym z docelowym obszarem roboczym usługi Azure Databricks.

Aby uzyskać zestaw danych Osób 10 M dla tego samouczka, wykonaj następujące czynności:

  1. Przejdź do strony Ludzie 10 M w Kaggle.
  2. Kliknij przycisk Pobierz , aby pobrać plik o nazwie archive.zip na komputerze lokalnym.
  3. Wyodrębnij plik o nazwie export.csv z archive.zip pliku. Plik export.csv zawiera dane na potrzeby tego samouczka.

Aby przekazać export.csv plik do woluminu, wykonaj następujące czynności:

  1. Na pasku bocznym kliknij pozycję Wykaz.
  2. W Eksploratorze wykazu przejdź do i otwórz wolumin, w którym chcesz przekazać export.csv plik.
  3. Kliknij pozycję Przekaż do tego woluminu.
  4. Przeciągnij i upuść lub przejdź do i wybierz export.csv plik na komputerze lokalnym.
  5. Kliknij polecenie Przekaż.

W poniższych przykładach kodu zastąp /Volumes/main/default/my-volume/export.csv ciąg ścieżką do export.csv pliku w woluminie docelowym.

Tworzenie tabeli

Wszystkie tabele utworzone w usłudze Azure Databricks domyślnie używają usługi Delta Lake. Usługa Databricks zaleca używanie tabel zarządzanych w wykazie aparatu Unity.

W poprzednim przykładzie kodu i poniższych przykładach kodu zastąp nazwę tabeli docelowym main.default.people_10m katalogiem trzyczęściowym, schematem i nazwą tabeli w wykazie aparatu Unity.

Uwaga

Usługa Delta Lake jest domyślną wartością dla wszystkich poleceń odczytu, zapisu i tworzenia tabel w usłudze Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Poprzednie operacje tworzą nową zarządzaną tabelę. Aby uzyskać informacje o dostępnych opcjach podczas tworzenia tabeli delty, zobacz CREATE TABLE (TWORZENIE TABELI).

W środowisku Databricks Runtime 13.3 LTS i nowszym można użyć polecenia CREATE TABLE LIKE , aby utworzyć nową pustą tabelę delty, która duplikuje właściwości schematu i tabeli źródłowej tabeli delty. Może to być szczególnie przydatne podczas promowania tabel ze środowiska deweloperskiego do środowiska produkcyjnego, jak pokazano w poniższym przykładzie kodu:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Aby utworzyć pustą tabelę, możesz również użyć interfejsu DeltaTableBuilder API w usłudze Delta Lake dla języków Python i Scala. W porównaniu z równoważnymi interfejsami API elementu DataFrameWriter te interfejsy API ułatwiają określanie dodatkowych informacji, takich jak komentarze kolumn, właściwości tabeli i wygenerowane kolumny.

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert do tabeli

Aby scalić zestaw aktualizacji i wstawiania do istniejącej tabeli delty, należy użyć DeltaTable.merge metody dla języków Python i Scala oraz instrukcji MERGE INTO dla języka SQL. Na przykład poniższy przykład pobiera dane z tabeli źródłowej i scala je z docelową tabelą delty. Jeśli w obu tabelach istnieje pasujący wiersz, usługa Delta Lake aktualizuje kolumnę danych przy użyciu danego wyrażenia. Jeśli nie ma pasującego wiersza, usługa Delta Lake dodaje nowy wiersz. Ta operacja jest nazywana operacją upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

W programie SQL, jeśli określisz *wartość , spowoduje to zaktualizowanie lub wstawienie wszystkich kolumn w tabeli docelowej przy założeniu, że tabela źródłowa ma te same kolumny co tabela docelowa. Jeśli tabela docelowa nie zawiera tych samych kolumn, zapytanie zgłasza błąd analizy.

Należy określić wartość dla każdej kolumny w tabeli podczas wykonywania operacji wstawiania (na przykład, gdy w istniejącym zestawie danych nie ma pasującego wiersza). Nie trzeba jednak aktualizować wszystkich wartości.

Aby wyświetlić wyniki, wykonaj zapytanie względem tabeli.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Odczytywanie tabeli

Dostęp do danych w tabelach delty można uzyskać według nazwy tabeli lub ścieżki tabeli, jak pokazano w następujących przykładach:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Zapisywanie w tabeli

Usługa Delta Lake używa standardowej składni do zapisywania danych w tabelach.

Aby niepodziecznie dodać nowe dane do istniejącej tabeli delty, użyj trybu dołączania, jak pokazano w poniższych przykładach:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Aby zastąpić wszystkie dane w tabeli, użyj trybu zastępowania, jak w następujących przykładach:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Aktualizowanie tabeli

Możesz zaktualizować dane zgodne z predykatem w tabeli delty. Na przykład w przykładowej people_10m tabeli, aby zmienić skrót w gender kolumnie z M lub na Male lub F Female, można uruchomić następujące polecenie:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Usuwanie z tabeli

Możesz usunąć dane zgodne z predykatem z tabeli delty. Na przykład w przykładowej people_10m tabeli, aby usunąć wszystkie wiersze odpowiadające osobom z wartością w birthDate kolumnie sprzed 1955, można uruchomić następujące polecenie:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Ważne

Usunięcie usuwa dane z najnowszej wersji tabeli delty, ale nie usuwa ich z magazynu fizycznego, dopóki stare wersje nie zostaną jawnie opróżnione. Zobacz czyszczenie , aby uzyskać szczegółowe informacje.

Wyświetlanie historii tabeli

Aby wyświetlić historię tabeli, należy użyć DeltaTable.history metody dla języków Python i Scala oraz instrukcji DESCRIBE HISTORY w języku SQL, która zawiera informacje o pochodzenia, w tym wersję tabeli, operację, użytkownika i tak dalej, dla każdego zapisu w tabeli.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Wykonywanie zapytań względem wcześniejszej wersji tabeli (podróż czasowa)

Podróż w czasie usługi Delta Lake umożliwia wykonywanie zapytań dotyczących starszej migawki tabeli delty.

Aby wysłać zapytanie do starszej wersji tabeli, określ wersję lub sygnaturę czasową tabeli. Aby na przykład wysłać zapytanie o wersję 0 lub znacznik 2024-05-15T22:43:15.000+00:00Z czasu z poprzedniej historii, użyj następującego polecenia:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

W przypadku znaczników czasu akceptowane są tylko ciągi daty lub znacznika czasu, na przykład "2024-05-15T22:43:15.000+00:00" lub "2024-05-15 22:43:15".

Opcje elementu DataFrameReader umożliwiają utworzenie ramki danych na podstawie tabeli delty, która jest stała do określonej wersji lub znacznika czasu tabeli, na przykład:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Aby uzyskać szczegółowe informacje, zobacz Praca z historią tabel usługi Delta Lake.

Optymalizowanie tabeli

Po wykonaniu wielu zmian w tabeli może istnieć wiele małych plików. Aby zwiększyć szybkość zapytań odczytu, możesz użyć operacji optymalizacji, aby zwinąć małe pliki na większe:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Kolejność Z według kolumn

Aby jeszcze bardziej zwiększyć wydajność odczytu, można połączyć powiązane informacje w tym samym zestawie plików przez porządkowanie z. Algorytmy pomijania danych usługi Delta Lake używają tej kolokacji, aby znacznie zmniejszyć ilość danych, które należy odczytać. Aby uzyskać dane kolejności z, należy określić kolumny do kolejności w kolejności z według operacji. Aby na przykład przeprowadzić sortowanie według genderpolecenia , uruchom polecenie:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Aby uzyskać pełny zestaw opcji dostępnych podczas uruchamiania operacji optymalizacji, zobacz Optymalizowanie układu pliku danych.

Czyszczenie migawek za pomocą polecenia VACUUM

Usługa Delta Lake zapewnia izolację migawek dla operacji odczytu, co oznacza, że można bezpiecznie uruchomić operację optymalizacji nawet wtedy, gdy inni użytkownicy lub zadania wysyłają zapytania do tabeli. W końcu jednak należy wyczyścić stare migawki. Można to zrobić, uruchamiając operację próżniową:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Aby uzyskać szczegółowe informacje na temat efektywnego korzystania z operacji czyszczenia, zobacz Usuwanie nieużywanych plików danych z próżnią.