Praca z tabelami różnicowymi na platformie Spark

Ukończone

Możesz pracować z tabelami różnicowymi (lub plikami formatu różnicowego), aby pobierać i modyfikować dane na wiele sposobów.

Korzystanie z usługi Spark SQL

Najczęstszym sposobem pracy z danymi w tabelach różnicowych na platformie Spark jest użycie usługi Spark SQL. Instrukcje SQL można osadzać w innych językach (takich jak PySpark lub Scala) przy użyciu biblioteki spark.sql . Na przykład poniższy kod wstawia wiersz do tabeli products .

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Alternatywnie możesz użyć %%sql magii w notesie do uruchamiania instrukcji SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Korzystanie z interfejsu API delty

Jeśli chcesz pracować z plikami różnicowymi, a nie tabelami wykazu, użycie interfejsu API usługi Delta Lake może być prostsze. Możesz utworzyć wystąpienie tabeli deltaTable z lokalizacji folderu zawierającej pliki w formacie różnicowym, a następnie zmodyfikować dane w tabeli przy użyciu interfejsu API.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Korzystanie z podróży czasowej do pracy z przechowywaniem wersji tabel

Modyfikacje tabel różnicowych są rejestrowane w dzienniku transakcji dla tabeli. Zarejestrowane transakcje umożliwiają wyświetlanie historii zmian wprowadzonych w tabeli oraz pobieranie starszych wersji danych (znanych jako podróż w czasie)

Aby wyświetlić historię tabeli, możesz użyć DESCRIBE polecenia SQL, jak pokazano tutaj.

%%sql

DESCRIBE HISTORY products

Wyniki tej instrukcji pokazują transakcje, które zostały zastosowane do tabeli, jak pokazano tutaj (niektóre kolumny zostały pominięte):

version sygnatura czasowa rozdzielnicy operationParameters
2 2023-04-04T21:46:43Z AKTUALIZUJ {"predykate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z NAPISZ {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":""{}}

Aby wyświetlić historię tabeli zewnętrznej, możesz określić lokalizację folderu zamiast nazwy tabeli.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Dane można pobrać z określonej wersji danych, odczytując lokalizację pliku różnicowego do ramki danych, określając wersję wymaganą versionAsOf jako opcję:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Alternatywnie możesz określić znacznik czasu przy użyciu timestampAsOf opcji :

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)