Sdílet prostřednictvím


Kurz: Delta Lake

Tento kurz představuje běžné operace Delta Lake v Azure Databricks, včetně následujících:

Příklad kódu Pythonu, Scaly a SQL v tomto článku můžete spustit z poznámkového bloku připojeného k výpočetnímu prostředku Azure Databricks, jako je cluster. Kód SQL v tomto článku můžete spustit také z dotazu přidruženého k SQL Warehouse v Databricks SQL.

Příprava zdrojových dat

Tento kurz využívá datovou sadu s názvem Lidé 10 M. Obsahuje 10 milionů fiktivních záznamů, které obsahují fakta o lidech, jako jsou křestní jména a příjmení, datum narození a plat. Tento kurz předpokládá, že tato datová sada je ve svazku katalogu Unity, který je přidružený k cílovému pracovnímu prostoru Azure Databricks.

Pokud chcete získat datovou sadu People 10 M pro tento kurz, postupujte takto:

  1. Přejděte na stránku Lidé 10 M v Kaggle.
  2. Kliknutím na tlačítko Stáhnout stáhnete soubor s názvem archive.zip na místní počítač.
  3. Extrahujte soubor s názvem export.csv ze archive.zip souboru. Soubor export.csv obsahuje data pro tento kurz.

Pokud chcete soubor nahrát export.csv do svazku, postupujte takto:

  1. Na bočním panelu klikněte na Katalog.
  2. V Průzkumníku katalogu přejděte na svazek, do kterého chcete soubor nahrát, a otevřete ho export.csv .
  3. Klikněte na Nahrát na tento svazek.
  4. Přetáhněte soubor na místním počítači nebo ho export.csv vyberte a vyberte.
  5. Klikněte na tlačítko Odeslat.

V následujících příkladech kódu nahraďte /Volumes/main/default/my-volume/export.csv cestou k export.csv souboru ve vašem cílovém svazku.

Vytvoření tabulky

Všechny tabulky vytvořené v Azure Databricks ve výchozím nastavení používají Delta Lake. Databricks doporučuje používat spravované tabulky Katalogu Unity.

V předchozím příkladu kódu a v následujících příkladech kódu nahraďte název main.default.people_10m tabulky cílovým třídílným katalogem, schématem a názvem tabulky v katalogu Unity.

Poznámka:

Delta Lake je výchozí nastavení pro všechny příkazy pro čtení, zápisy a vytváření tabulek v Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Předchozí operace vytvoří novou spravovanou tabulku. Informace o dostupných možnostech při vytváření tabulky Delta najdete v tématu CREATE TABLE.

Ve verzi Databricks Runtime 13.3 LTS a vyšší můžete pomocí příkazu CREATE TABLE LIKE vytvořit novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky pro zdrojovou tabulku Delta. To může být užitečné zejména při propagaci tabulek z vývojového prostředí do produkčního prostředí, jak je znázorněno v následujícím příkladu kódu:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

K vytvoření prázdné tabulky můžete také použít DeltaTableBuilder rozhraní API v Delta Lake pro Python a Scala. V porovnání s ekvivalentními rozhraními API DataFrameWriter tato rozhraní API usnadňují zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.

Důležité

Tato funkce je ve verzi Public Preview.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Přenesení do tabulky

Ke sloučení sady aktualizací a vložení do existující tabulky Delta použijte metodu DeltaTable.merge pro Python a Scala a příkaz MERGE INTO pro SQL. Následující příklad například přebírá data ze zdrojové tabulky a slučuje je do cílové tabulky Delta. Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek. Tato operace se označuje jako upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Pokud v SQL zadáte *, tato aktualizace nebo vloží všechny sloupce v cílové tabulce za předpokladu, že zdrojová tabulka má stejné sloupce jako cílová tabulka. Pokud cílová tabulka nemá stejné sloupce, dotaz vyvolá chybu analýzy.

Při provádění operace vložení musíte zadat hodnotu pro každý sloupec v tabulce (například pokud v existující datové sadě neexistuje odpovídající řádek). Nemusíte ale aktualizovat všechny hodnoty.

Pokud chcete zobrazit výsledky, zadejte dotaz na tabulku.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Čtení tabulky

K datům v tabulkách Delta přistupujete podle názvu tabulky nebo cesty k tabulce, jak je znázorněno v následujících příkladech:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Zápis do tabulky

Delta Lake používá pro zápis dat do tabulek standardní syntaxi.

Pokud chcete do existující tabulky Delta přidat nová data atomicky, použijte režim připojení, jak je znázorněno v následujících příkladech:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Pokud chcete nahradit všechna data v tabulce, použijte režim přepsání jako v následujících příkladech:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Aktualizace tabulky

Data, která odpovídají predikátu v tabulce Delta, můžete aktualizovat. Například v ukázkové people_10m tabulce můžete změnit zkratku gender ve sloupci z M nebo F na Male nebo Female, můžete spustit následující:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Odstranění z tabulky

Data, která odpovídají predikátu, můžete odebrat z tabulky Delta. Například v ukázkové people_10m tabulce můžete odstranit všechny řádky odpovídající lidem s hodnotou ve birthDate sloupci před 1955spuštěním následujícího příkazu:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Důležité

Odstranění odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Podrobnosti najdete v vakuu .

Zobrazení historie tabulek

Pokud chcete zobrazit historii tabulky, použijete metodu DeltaTable.history pro Python a Scala a příkaz DESCRIBE HISTORY v SQL, který poskytuje informace o původu, včetně verze tabulky, operace, uživatele atd., pro každý zápis do tabulky.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Dotazování na starší verzi tabulky (časová cesta)

Časová cesta Delta Lake umožňuje dotazovat starší snímek tabulky Delta.

Pokud chcete zadat dotaz na starší verzi tabulky, zadejte její verzi nebo časové razítko. Pokud chcete například zadat dotaz na verzi 0 nebo časové razítko 2024-05-15T22:43:15.000+00:00Z z předchozí historie, použijte následující:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

U časových razítek se přijímají pouze řetězce data nebo časového razítka, "2024-05-15T22:43:15.000+00:00" například nebo "2024-05-15 22:43:15".

Možnosti třídy DataFrameReader umožňují vytvořit datový rámec z tabulky Delta, která je pevná pro konkrétní verzi nebo časové razítko tabulky, například:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Podrobnosti najdete v tématu Práce s historií tabulek Delta Lake.

Optimalizace tabulky

Po provedení několika změn v tabulce můžete mít velké množství malých souborů. Pokud chcete zvýšit rychlost dotazů na čtení, můžete pomocí operace optimalizace sbalit malé soubory do větších:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Pořadí vykreslování podle sloupců

Pokud chcete zvýšit výkon čtení, můžete sloučit související informace ve stejné sadě souborů řazením z. Algoritmy pro přeskočení dat Delta Lake používají tuto kolaci k výraznému snížení množství dat, která je potřeba číst. Chcete-li data pořadí vykreslování, zadáte sloupce, podle kterých se má pořadí podle operace provést. Pokud chcete například sloučit bajt, genderspusťte:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Úplnou sadu možností dostupných při spuštění operace optimalizace najdete v tématu Optimalizace rozložení datového souboru.

Vyčištění snímků pomocí VACUUM

Delta Lake poskytuje izolaci snímků pro čtení, což znamená, že je bezpečné spustit operaci optimalizace i v případě, že se na tabulku dotazují jiní uživatelé nebo úlohy. Nakonec byste ale měli staré snímky vyčistit. Můžete to provést spuštěním operace vakua:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Podrobnosti o efektivním používání operace vakua naleznete v tématu Odebrání nepoužívaných datových souborů s vakuem.