Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks
Zarządzanie stanem opartym na bazie danych RocksDB można włączyć, ustawiając następującą konfigurację w usłudze SparkSession przed rozpoczęciem zapytania przesyłania strumieniowego.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Bazę danych RocksDB można włączyć w potokach delta Live Tables. Zobacz Optymalizowanie konfiguracji potoku pod kątem przetwarzania stanowego.
Włączanie tworzenia punktów kontrolnych dziennika zmian
W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć tworzenie punktów kontrolnych dziennika zmian w celu obniżenia czasu trwania punktu kontrolnego i całkowitego opóźnienia obciążeń przesyłania strumieniowego ze strukturą. Usługa Databricks zaleca włączenie punktów kontrolnych dziennika zmian dla wszystkich zapytań stanowych przesyłania strumieniowego ze strukturą.
Tradycyjnie rocksDB State Store migawki i przekazuje pliki danych podczas tworzenia punktów kontrolnych. Aby uniknąć tego kosztu, punkty kontrolne dziennika zmian zapisują tylko rekordy, które uległy zmianie od ostatniego punktu kontrolnego na trwały magazyn.
Punkty kontrolne dziennika zmian są domyślnie wyłączone. Punkty kontrolne dziennika zmian można włączyć na poziomie SparkSession przy użyciu następującej składni:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Możesz włączyć tworzenie punktów kontrolnych dziennika zmian w istniejącym strumieniu i obsługiwać informacje o stanie przechowywane w punkcie kontrolnym.
Ważne
Zapytania z włączonym tworzeniem punktów kontrolnych dziennika zmian można uruchamiać tylko w środowisku Databricks Runtime 13.3 LTS lub nowszym. Możesz wyłączyć tworzenie punktów kontrolnych dziennika zmian, aby przywrócić starsze zachowanie punktów kontrolnych, ale nadal należy uruchamiać te zapytania w środowisku Databricks Runtime 13.3 LTS lub nowszym. Aby te zmiany zostały wprowadzone, należy ponownie uruchomić zadanie.
Metryki magazynu stanów bazy danych RocksDB
Każdy operator stanu zbiera metryki związane z operacjami zarządzania stanami wykonywanymi w wystąpieniu bazy danych RocksDB, aby obserwować magazyn stanów i potencjalnie pomóc w debugowaniu spowolnienia zadań. Te metryki są agregowane (suma) na operator stanu w zadaniu we wszystkich zadaniach, w których jest uruchomiony operator stanu. Te metryki są częścią customMetrics
mapy wewnątrz stateOperators
pól w pliku StreamingQueryProgress
. Poniżej przedstawiono przykład w formularzu StreamingQueryProgress
JSON (uzyskanym przy użyciu metody StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Szczegółowe opisy metryk są następujące:
Nazwa metryki | opis |
---|---|
rocksdbCommitWriteBatchLatency | Czas (w milisach) wziął na zastosowanie przygotowanych zapisów w strukturze w pamięci (WriteBatch) do natywnej bazy danych RocksDB. |
rocksdbCommitFlushLatency | Czas (w milisach) trwał opróżnianie zmian w pamięci bazy danych RocksDB na dysku lokalnym. |
rocksdbCommitCompactLatency | Czas (w milisach) wziął na kompaktowanie (opcjonalnie) podczas zatwierdzania punktu kontrolnego. |
rocksdbCommitPauseLatency | Czas (w milisach) trwał zatrzymanie wątków procesu roboczego w tle (w przypadku kompaktowania itp.) w ramach zatwierdzenia punktu kontrolnego. |
rocksdbCommitCheckpointLatency | Czas (w milisie) wziął na utworzenie migawki natywnej bazy danych RocksDB i zapisanie go w katalogu lokalnym. |
rocksdbCommitFileSyncLatencyMs | Czas (w milisach) wziął na synchronizację natywnych plików powiązanych z migawką bazy danych RocksDB z zewnętrznym magazynem (lokalizacja punktu kontrolnego). |
rocksdbGetLatency | Średni czas (w nanos) wziął na bazowe wywołanie natywne RocksDB::Get . |
rocksdbPutCount | Średni czas (w nanos) wziął na bazowe wywołanie natywne RocksDB::Put . |
rocksdbGetCount | Liczba wywołań natywnych RocksDB::Get (nie obejmuje Gets funkcji WriteBatch — w partii pamięci używanej do przejściowych zapisów). |
rocksdbPutCount | Liczba wywołań natywnych RocksDB::Put (nie obejmuje Puts funkcji WriteBatch — w partii pamięci używanej do przejściowych zapisów). |
rocksdbTotalBytesReadByGet | Liczba nieskompresowanych bajtów odczytanych za pośrednictwem wywołań natywnych RocksDB::Get . |
rocksdbTotalBytesWrittenByPut | Liczba nieskompresowanych bajtów napisanych za pośrednictwem wywołań natywnych RocksDB::Put . |
rocksdbReadBlockCacheHitCount | Liczba przypadków użycia natywnej pamięci podręcznej blokowej Bazy danych RocksDB w celu uniknięcia odczytywania danych z dysku lokalnego. |
rocksdbReadBlockCacheMissCount | Liczba nieodebranych i wymaganych danych odczytu z dysku lokalnego przez natywną pamięć podręczną bloków RocksDB. |
rocksdbTotalBytesReadByCompaction | Liczba bajtów odczytanych z dysku lokalnego przez natywny proces kompaktowania bazy danych RocksDB. |
rocksdbTotalBytesWrittenByCompaction | Liczba bajtów zapisanych na dysku lokalnym przez natywny proces kompaktowania bazy danych RocksDB. |
rocksdbTotalCompactionLatencyMs | Czas (w milisach) wziął na kompaktowanie bazy danych RocksDB (tło i opcjonalne kompaktowanie zainicjowane podczas zatwierdzania). |
rocksdbWriterStallLatencyMs | Czas (w milis) zapis utknął w martwym punkcie z powodu kompaktowania tła lub opróżniania memtables na dysku. |
rocksdbTotalBytesReadThroughIterator | Niektóre operacje stanowe (takie jak przetwarzanie limitu czasu w flatMapGroupsWithState agregacjach okiennych lub znakowanie wodne) wymagają odczytywania całych danych w bazie danych za pośrednictwem iteratora. Całkowity rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora. |