Praca z tabelami różnicowymi na platformie Spark
Możesz pracować z tabelami różnicowymi (lub plikami formatu różnicowego), aby pobierać i modyfikować dane na wiele sposobów.
Korzystanie z usługi Spark SQL
Najczęstszym sposobem pracy z danymi w tabelach różnicowych na platformie Spark jest użycie usługi Spark SQL. Instrukcje SQL można osadzać w innych językach (takich jak PySpark lub Scala) przy użyciu biblioteki spark.sql . Na przykład poniższy kod wstawia wiersz do tabeli products .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Alternatywnie możesz użyć %%sql
magii w notesie do uruchamiania instrukcji SQL.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Korzystanie z interfejsu API delty
Jeśli chcesz pracować z plikami różnicowymi, a nie tabelami wykazu, użycie interfejsu API usługi Delta Lake może być prostsze. Możesz utworzyć wystąpienie tabeli deltaTable z lokalizacji folderu zawierającej pliki w formacie różnicowym, a następnie zmodyfikować dane w tabeli przy użyciu interfejsu API.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Korzystanie z podróży czasowej do pracy z przechowywaniem wersji tabel
Modyfikacje tabel różnicowych są rejestrowane w dzienniku transakcji dla tabeli. Zarejestrowane transakcje umożliwiają wyświetlanie historii zmian wprowadzonych w tabeli oraz pobieranie starszych wersji danych (znanych jako podróż w czasie)
Aby wyświetlić historię tabeli, możesz użyć DESCRIBE
polecenia SQL, jak pokazano tutaj.
%%sql
DESCRIBE HISTORY products
Wyniki tej instrukcji pokazują transakcje, które zostały zastosowane do tabeli, jak pokazano tutaj (niektóre kolumny zostały pominięte):
version | sygnatura czasowa | rozdzielnicy | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | AKTUALIZUJ | {"predykate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | NAPISZ | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":""{}} |
Aby wyświetlić historię tabeli zewnętrznej, możesz określić lokalizację folderu zamiast nazwy tabeli.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Dane można pobrać z określonej wersji danych, odczytując lokalizację pliku różnicowego do ramki danych, określając wersję wymaganą versionAsOf
jako opcję:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Alternatywnie możesz określić znacznik czasu przy użyciu timestampAsOf
opcji :
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)