Najlepsze praktyki dotyczące usługi Delta Lake
W tym artykule opisano najlepsze rozwiązania dotyczące korzystania z usługi Delta Lake.
Usługa Databricks zaleca korzystanie z optymalizacji predykcyjnej. Zobacz Optymalizacja predykcyjna dla tabel zarządzanych w wykazie aparatu Unity.
Podczas usuwania i ponownego tworzenia tabeli w tej samej lokalizacji należy zawsze użyć instrukcji CREATE OR REPLACE TABLE
. Zobacz Usuwanie lub zastępowanie tabeli delty.
Usuwanie starszych konfiguracji funkcji delta
Usługa Databricks zaleca usunięcie najbardziej jawnych starszych konfiguracji delty z konfiguracji platformy Spark i właściwości tabeli podczas uaktualniania do nowej wersji środowiska Databricks Runtime. Starsze konfiguracje mogą uniemożliwiać stosowanie nowych optymalizacji i wartości domyślnych wprowadzonych przez usługę Databricks do migrowanych obciążeń.
Używanie klastrowania liquid w celu zoptymalizowanego pomijania danych
Usługa Databricks zaleca używanie klastrowania liquid, a nie partycjonowania, kolejności Z lub innych strategii organizacji danych w celu optymalizacji układu danych pod kątem pomijania danych. Zobacz Użyj płynnego klastrowania dla tabel typu Delta).
Kompaktowanie plików
Optymalizacja predykcyjna jest uruchamiana automatycznie OPTIMIZE
i VACUUM
polecenia w tabelach zarządzanych wykazu aparatu Unity. Zobacz Optymalizacja predykcyjna dla tabel zarządzanych w wykazie aparatu Unity.
Usługa Databricks zaleca częste uruchamianie polecenia OPTIMIZE w celu kompaktowania małych plików.
Uwaga
Ta operacja nie powoduje usunięcia starych plików. Aby je usunąć, uruchom polecenie VACUUM .
Zastępowanie zawartości lub schematu tabeli
Czasami możesz zastąpić tabelę delty. Na przykład:
- Dane w tabeli są niepoprawne i chcesz zamienić zawartość.
- Chcesz ponownie napisać całą tabelę, aby wykonać niezgodne zmiany schematu (takie jak zmiana typów kolumn).
Chociaż można usunąć cały katalog tabeli delty i utworzyć nową tabelę w tej samej ścieżce, nie jest to zalecane , ponieważ:
- Usuwanie katalogu nie jest wydajne. Usunięcie katalogu zawierającego bardzo duże pliki może potrwać kilka godzin, a nawet dni.
- Utracisz całą zawartość usuniętych plików; Odzyskanie nieprawidłowej tabeli jest trudne.
- Usunięcie katalogu nie jest niepodzielne. Podczas usuwania tabeli równoczesne odczytywanie tabeli może zakończyć się niepowodzeniem lub zobaczyć częściową tabelę.
Jeśli nie musisz zmieniać schematu tabeli, możesz usunąć dane z tabeli delty i wstawić nowe dane lub zaktualizować tabelę, aby naprawić nieprawidłowe wartości.
Jeśli chcesz zmienić schemat tabeli, możesz zastąpić całą tabelę niepodziealnie. Na przykład:
Python
dataframe.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("<your-table>") # Managed table
dataframe.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("path", "<your-table-path>") \
.saveAsTable("<your-table>") # External table
SQL
REPLACE TABLE <your-table> AS SELECT ... -- Managed table
REPLACE TABLE <your-table> LOCATION "<your-table-path>" AS SELECT ... -- External table
Scala
dataframe.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable("<your-table>") // Managed table
dataframe.write
.mode("overwrite")
.option("overwriteSchema", "true")
.option("path", "<your-table-path>")
.saveAsTable("<your-table>") // External table
Istnieje wiele korzyści z tego podejścia:
- Zastępowanie tabeli jest znacznie szybsze, ponieważ nie musi ponownie wyświetlać listy katalogów ani usuwać żadnych plików.
- Stara wersja tabeli nadal istnieje. Jeśli usuniesz niewłaściwą tabelę, możesz łatwo pobrać stare dane przy użyciu podróży w czasie. Zobacz Praca z historią tabel platformy Delta Lake.
- Jest to operacja niepodzielna. Zapytania współbieżne nadal mogą odczytywać tabelę podczas usuwania tabeli.
- Ze względu na gwarancje transakcji Delta Lake ACID, jeśli zastąpienie tabeli zakończy się niepowodzeniem, tabela będzie w poprzednim stanie.
Ponadto, jeśli chcesz usunąć stare pliki, aby zaoszczędzić koszty magazynowania po zastąpieniu tabeli, możesz użyć funkcji VACUUM , aby je usunąć. Jest ona zoptymalizowana pod kątem usuwania plików i zwykle jest szybsza niż usuwanie całego katalogu.
Buforowanie platformy Spark
Usługa Databricks nie zaleca używania buforowania platformy Spark z następujących powodów:
- Utracisz wszelkie pomijanie danych, które mogą pochodzić z dodatkowych filtrów dodanych w pamięci podręcznej
DataFrame
. - Dane, które są buforowane, mogą nie zostać zaktualizowane, jeśli do tabeli uzyskuje się dostęp przy użyciu innego identyfikatora.
Różnice między usługą Delta Lake i Parquet na platformie Apache Spark
Usługa Delta Lake automatycznie obsługuje następujące operacje. Nigdy nie należy wykonywać tych operacji ręcznie:
REFRESH TABLE
: Tabele różnicowe zawsze zwracają najbardziej aktualne informacje, więc nie ma potrzeby ręcznego wywoływaniaREFRESH TABLE
po zmianach.- Dodawanie i usuwanie partycji: usługa Delta Lake automatycznie śledzi zestaw partycji znajdujących się w tabeli i aktualizuje listę w miarę dodawania lub usuwania danych. W związku z tym nie ma potrzeby uruchamiania
ALTER TABLE [ADD|DROP] PARTITION
aniMSCK
. - Ładowanie pojedynczej partycji: bezpośrednie odczytywanie partycji nie jest konieczne. Na przykład nie trzeba uruchamiać polecenia
spark.read.format("parquet").load("/data/date=2017-01-01")
. Zamiast tego należy użyć klauzuliWHERE
pomijania danych, takiej jakspark.read.table("<table-name>").where("date = '2017-01-01'")
. - Nie modyfikuj ręcznie plików danych: usługa Delta Lake używa dziennika transakcji do zatwierdzania zmian w tabeli niepodzielnie. Nie należy bezpośrednio modyfikować, dodawać ani usuwać plików danych Parquet w tabeli delty, ponieważ może to prowadzić do utraty danych lub uszkodzenia tabeli.
Zwiększanie wydajności scalania usługi Delta Lake
Możesz skrócić czas potrzebny na scalenie, korzystając z następujących metod:
Zmniejsz przestrzeń wyszukiwania dla dopasowań: domyślnie
merge
operacja przeszukuje całą tabelę delty w celu znalezienia dopasowań w tabeli źródłowej. Jednym ze sposobów przyspieszeniamerge
jest zmniejszenie przestrzeni wyszukiwania przez dodanie znanych ograniczeń w warunku dopasowania. Załóżmy na przykład, że masz tabelę podzieloną nacountry
partycje idate
chcesz użyćmerge
jej do zaktualizowania informacji z ostatniego dnia i określonego kraju. Dodanie następującego warunku sprawia, że zapytanie jest szybsze, ponieważ wyszukuje dopasowania tylko w odpowiednich partycjach:events.date = current_date() AND events.country = 'USA'
Ponadto to zapytanie zmniejsza również prawdopodobieństwo konfliktów z innymi operacjami współbieżnymi. Aby uzyskać więcej informacji, zobacz Poziomy izolacji i konflikty zapisu w usłudze Azure Databricks .
Pliki kompaktowe: jeśli dane są przechowywane w wielu małych plikach, odczytywanie danych w celu wyszukiwania dopasowań może stać się powolne. W celu zwiększenia przepływności odczytu można skompaktować małe pliki w większe pliki. Aby uzyskać szczegółowe informacje, zobacz Optymalizowanie układu pliku danych.
Kontroluj partycje mieszania dla operacji zapisu:
merge
operacja tasuje dane wiele razy w celu obliczenia i zapisania zaktualizowanych danych. Liczba zadań używanych do mieszania jest kontrolowana przez konfiguracjęspark.sql.shuffle.partitions
sesji platformy Spark. Ustawienie tego parametru nie tylko steruje równoległością, ale także określa liczbę plików wyjściowych. Zwiększenie wartości zwiększa równoległość, ale także generuje większą liczbę mniejszych plików danych.Włącz zoptymalizowane zapisy: w przypadku tabel
merge
partycjonowanych może utworzyć znacznie większą liczbę małych plików niż liczba partycji mieszania. Dzieje się tak, ponieważ każde zadanie mieszania może zapisywać wiele plików w wielu partycjach i może stać się wąskim gardłem wydajności. Liczbę plików można zmniejszyć, włączając zoptymalizowane zapisy. Zobacz Zoptymalizowane zapisy dla usługi Delta Lake w usłudze Azure Databricks.Dostrajanie rozmiarów plików w tabeli: usługa Azure Databricks może automatycznie wykrywać, czy tabela delty ma częste
merge
operacje, które ponownie zapisują pliki i mogą zmniejszyć rozmiar plików przepisanych w oczekiwaniu na dalsze ponowne zapisywanie plików w przyszłości. Aby uzyskać szczegółowe informacje, zobacz sekcję dotyczącą dostrajania rozmiarów plików.Low Shuffle Merge: Low Shuffle Merge zapewnia zoptymalizowaną implementację
MERGE
, która zapewnia lepszą wydajność dla najbardziej typowych obciążeń. Ponadto zachowuje istniejące optymalizacje układu danych, takie jak porządkowanie Z na niezmodyfikowanych danych.
Zarządzanie recencyjnością danych
Na początku każdego zapytania tabele delty są automatycznie aktualizowane do najnowszej wersji tabeli. Ten proces można zaobserwować w notesach, gdy polecenie zgłasza stan polecenia: Updating the Delta table's state
. Jednak podczas uruchamiania analizy historycznej w tabeli może nie być konieczne pobieranie danych z ostatnich minut, zwłaszcza w przypadku tabel, w których często pozyskiwane są dane przesyłane strumieniowo. W takich przypadkach zapytania mogą być uruchamiane na nieaktualnych migawkach tabeli delty. Takie podejście może zmniejszyć opóźnienie w uzyskiwaniu wyników z zapytań.
Można skonfigurować tolerancję dla nieaktualnych danych, ustawiając konfigurację spark.databricks.delta.stalenessLimit
sesji platformy Spark z wartością ciągu czasu, taką jak 1h
lub 15m
(odpowiednio przez 1 godzinę lub 15 minut). Ta konfiguracja jest specyficzna dla sesji i nie ma wpływu na innych klientów, którzy uzyskują dostęp do tabeli. Jeśli stan tabeli został zaktualizowany w limicie nieaktualności, zapytanie względem tabeli zwraca wyniki bez oczekiwania na najnowszą aktualizację tabeli. To ustawienie nigdy nie uniemożliwia aktualizowaniu tabeli, a gdy są zwracane nieaktualne dane, proces aktualizacji w tle. Jeśli ostatnia aktualizacja tabeli jest starsza niż limit nieaktualności, zapytanie nie zwraca wyników do momentu zakończenia aktualizacji stanu tabeli.
Ulepszone punkty kontrolne dla zapytań o małych opóźnieniach
Usługa Delta Lake zapisuje punkty kontrolne jako stan agregacji tabeli delty z zoptymalizowaną częstotliwością. Te punkty kontrolne służą jako punkt wyjścia do obliczenia najnowszego stanu tabeli. Bez punktów kontrolnych usługa Delta Lake musiałaby odczytać dużą kolekcję plików JSON ("pliki delta" reprezentujące zatwierdzenia w dzienniku transakcji w celu obliczenia stanu tabeli. Ponadto statystyki na poziomie kolumny usługi Delta Lake używają funkcji delta lake do wykonywania pomijania danych są przechowywane w punkcie kontrolnym.
Ważne
Punkty kontrolne usługi Delta Lake różnią się od punktów kontrolnych przesyłania strumieniowego ze strukturą. Zobacz Ustrukturyzowane punkty kontrolne przesyłania strumieniowego.
Statystyki na poziomie kolumny są przechowywane jako struktura i kod JSON (w celu zapewnienia zgodności z poprzednimi wersjami). Format struktury sprawia, że funkcja Delta Lake odczytuje znacznie szybciej, ponieważ:
- Usługa Delta Lake nie wykonuje kosztownej analizy JSON w celu uzyskania statystyk na poziomie kolumny.
- Możliwości oczyszczania kolumn Parquet znacznie zmniejszają liczbę operacji we/wy wymaganych do odczytania statystyk dla kolumny.
Format struktury umożliwia zbieranie optymalizacji, które zmniejszają obciążenie operacji odczytu usługi Delta Lake od sekund do dziesiątek milisekund, co znacznie zmniejsza opóźnienie w przypadku krótkich zapytań.
Zarządzanie statystykami na poziomie kolumny w punktach kontrolnych
Zarządzasz sposobem zapisywania statystyk w punktach kontrolnych przy użyciu właściwości delta.checkpoint.writeStatsAsJson
tabeli i delta.checkpoint.writeStatsAsStruct
. Jeśli obie właściwości tabeli to false
, usługa Delta Lake nie może wykonać pomijania danych.
- Usługa Batch zapisuje statystyki zapisu zarówno w formacie JSON, jak i w formacie struktury. Parametr
delta.checkpoint.writeStatsAsJson
ma wartośćtrue
. delta.checkpoint.writeStatsAsStruct
jest domyślnie niezdefiniowany.- Czytelnicy używają kolumny struktury, gdy są dostępne, a w przeciwnym razie wracają do korzystania z kolumny JSON.
Ważne
Ulepszone punkty kontrolne nie przerywają zgodności z czytnikami usługi Delta Lake typu open source. Jednak ustawienie delta.checkpoint.writeStatsAsJson
wartości false
może mieć wpływ na zastrzeżone czytniki usługi Delta Lake. Skontaktuj się z dostawcami, aby dowiedzieć się więcej o implikacjach dotyczących wydajności.
Włączanie rozszerzonych punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą
Jeśli obciążenia przesyłania strumieniowego ze strukturą nie mają wymagań dotyczących małych opóźnień (opóźnienia podrzędne), możesz włączyć rozszerzone punkty kontrolne, uruchamiając następujące polecenie SQL:
ALTER TABLE <table-name> SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')
Możesz również poprawić opóźnienie zapisu punktu kontrolnego, ustawiając następujące właściwości tabeli:
ALTER TABLE <table-name> SET TBLPROPERTIES
(
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.checkpoint.writeStatsAsJson' = 'false'
)
Jeśli pomijanie danych nie jest przydatne w aplikacji, możesz ustawić obie właściwości na wartość false. Następnie nie są zbierane ani zapisywane żadne statystyki. Usługa Databricks nie zaleca tej konfiguracji.