Работа с разностными таблицами в Spark
Вы можете работать с разностными таблицами (или файлами разностного формата) для получения и изменения данных несколькими способами.
Использование Spark SQL
Наиболее распространенный способ работы с данными в разностных таблицах в Spark — использовать Spark SQL. Инструкции SQL можно внедрить в другие языки (например, PySpark или Scala) с помощью библиотеки spark.sql . Например, следующий код вставляет строку в таблицу продуктов .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Кроме того, можно использовать магию %%sql
в записной книжке для запуска инструкций SQL.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Использование РАЗНОСТНОГО API
Если вы хотите работать с разностными файлами, а не таблицами каталога, это может быть проще использовать API Delta Lake. Экземпляр DeltaTable можно создать из папки, содержащей файлы в разностном формате, а затем использовать API для изменения данных в таблице.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Использование времени для работы с управление версиями таблиц
Изменения, внесенные в разностные таблицы, регистрируются в журнале транзакций для таблицы. Журналы транзакций можно использовать для просмотра журнала изменений, внесенных в таблицу, и для получения старых версий данных (известных как путешествие по времени).
Чтобы просмотреть журнал таблицы, можно использовать DESCRIBE
команду SQL, как показано здесь.
%%sql
DESCRIBE HISTORY products
Результаты этой инструкции показывают транзакции, примененные к таблице, как показано здесь (некоторые столбцы были пропущены):
версия | TIMESTAMP | Операция | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | ОБНОВИТЬ | {"предикат":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | ЗАПИСЬ | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | СОЗДАТЬ ТАБЛИЦУ | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
Чтобы просмотреть журнал внешней таблицы, можно указать расположение папки вместо имени таблицы.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Вы можете получить данные из определенной версии данных, считывая разностное расположение файла в кадр данных, указав версию, необходимую versionAsOf
в качестве параметра:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Кроме того, можно указать метку времени с помощью параметра timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)