Tworzenie tabel usługi Delta Lake
Usługa Delta Lake jest oparta na tabelach, które zapewniają abstrakcję magazynu relacyjnego na plikach w usłudze Data Lake.
Tworzenie tabeli usługi Delta Lake na podstawie ramki danych
Jednym z najprostszych sposobów utworzenia tabeli usługi Delta Lake jest zapisanie ramki danych w formacie różnicowym , określając ścieżkę, w której powinny być przechowywane pliki danych i powiązane informacje o metadanych dla tabeli.
Na przykład następujący kod PySpark ładuje ramkę danych z istniejącym plikiem, a następnie zapisuje ramki danych w nowej lokalizacji folderu w formacie różnicowym :
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
Po zapisaniu tabeli delty określona lokalizacja ścieżki zawiera pliki parquet dla danych (niezależnie od formatu pliku źródłowego załadowanego do ramki danych) i folderu _delta_log zawierającego dziennik transakcji dla tabeli.
Uwaga
Dziennik transakcji rejestruje wszystkie modyfikacje danych w tabeli. Rejestrując każdą modyfikację, można wymusić spójność transakcyjną i przechowywać informacje o wersji tabeli.
Istniejącą tabelę usługi Delta Lake można zastąpić zawartością ramki danych przy użyciu trybu zastępowania , jak pokazano poniżej:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
Możesz również dodać wiersze z ramki danych do istniejącej tabeli przy użyciu trybu dołączania:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Wprowadzanie aktualizacji warunkowych
Podczas gdy można wprowadzać modyfikacje danych w ramce danych, a następnie zastąpić tabelę usługi Delta Lake, zastępując ją, bardziej typowym wzorcem w bazie danych jest wstawianie, aktualizowanie lub usuwanie wierszy w istniejącej tabeli jako dyskretnych operacji transakcyjnych. Aby wprowadzić takie modyfikacje w tabeli usługi Delta Lake, możesz użyć obiektu DeltaTable w interfejsie API usługi Delta Lake, który obsługuje operacje aktualizacji, usuwania i scalania . Na przykład można użyć następującego kodu, aby zaktualizować kolumnę price dla wszystkich wierszy z wartością kolumny kategorii "Akcesoria":
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Modyfikacje danych są rejestrowane w dzienniku transakcji, a nowe pliki parquet są tworzone w folderze tabeli zgodnie z potrzebami.
Napiwek
Aby uzyskać więcej informacji na temat korzystania z interfejsu API usługi Delta Lake, zobacz dokumentację interfejsu API usługi Delta Lake.
Wykonywanie zapytań względem poprzedniej wersji tabeli
Tabele usługi Delta Lake obsługują przechowywanie wersji za pośrednictwem dziennika transakcji. Dziennik transakcji rejestruje modyfikacje wprowadzone w tabeli, zwracając uwagi na znacznik czasu i numer wersji dla każdej transakcji. Możesz użyć tych zarejestrowanych danych wersji, aby wyświetlić poprzednie wersje tabeli — funkcję znaną jako podróż w czasie.
Dane z określonej wersji tabeli usługi Delta Lake można pobrać, odczytując dane z lokalizacji tabeli różnicowej do ramki danych, określając wersję wymaganą versionAsOf
jako opcję:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Alternatywnie możesz określić znacznik czasu przy użyciu timestampAsOf
opcji :
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)