Работа с разностными таблицами в Spark

Завершено

Вы можете работать с разностными таблицами (или файлами разностного формата) для получения и изменения данных несколькими способами.

Использование Spark SQL

Наиболее распространенный способ работы с данными в разностных таблицах в Spark — использовать Spark SQL. Инструкции SQL можно внедрить в другие языки (например, PySpark или Scala) с помощью библиотеки spark.sql . Например, следующий код вставляет строку в таблицу продуктов .

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Кроме того, можно использовать магию %%sql в записной книжке для запуска инструкций SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Использование РАЗНОСТНОГО API

Если вы хотите работать с разностными файлами, а не таблицами каталога, это может быть проще использовать API Delta Lake. Экземпляр DeltaTable можно создать из папки, содержащей файлы в разностном формате, а затем использовать API для изменения данных в таблице.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Использование времени для работы с управление версиями таблиц

Изменения, внесенные в разностные таблицы, регистрируются в журнале транзакций для таблицы. Журналы транзакций можно использовать для просмотра журнала изменений, внесенных в таблицу, и для получения старых версий данных (известных как путешествие по времени).

Чтобы просмотреть журнал таблицы, можно использовать DESCRIBE команду SQL, как показано здесь.

%%sql

DESCRIBE HISTORY products

Результаты этой инструкции показывают транзакции, примененные к таблице, как показано здесь (некоторые столбцы были пропущены):

версия TIMESTAMP Операция operationParameters
2 2023-04-04T21:46:43Z ОБНОВИТЬ {"предикат":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z ЗАПИСЬ {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z СОЗДАТЬ ТАБЛИЦУ {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Чтобы просмотреть журнал внешней таблицы, можно указать расположение папки вместо имени таблицы.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Вы можете получить данные из определенной версии данных, считывая разностное расположение файла в кадр данных, указав версию, необходимую versionAsOf в качестве параметра:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Кроме того, можно указать метку времени с помощью параметра timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)