Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks
Zarządzanie stanem opartym na bazie danych RocksDB można włączyć, ustawiając następującą konfigurację w usłudze SparkSession przed rozpoczęciem zapytania przesyłania strumieniowego.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Bazę danych RocksDB można włączyć w potokach DLT. Zobacz Optymalizowanie konfiguracji potoku na potrzeby przetwarzania stanowego.
Włącz tworzenie punktów kontrolnych dziennika zmian
W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć punkt kontrolny dziennika zmian w celu obniżenia czasu trwania punktu kontrolnego oraz ogólnego opóźnienia obciążeń związanych ze strumieniowym przesyłaniem danych. Usługa Databricks zaleca włączenie punktów kontrolnych dziennika zmian dla wszystkich zapytań stanowych przesyłania strumieniowego ze strukturą.
Tradycyjnie RocksDB State Store wykonuje migawki i wysyła pliki danych podczas tworzenia punktów kontrolnych. Aby uniknąć tego kosztu, punkty kontrolne w dzienniku zmian zapisują do trwałego magazynu tylko te rekordy, które uległy zmianie od ostatniego punktu kontrolnego.
Punkty kontrolne dziennika zmian są domyślnie wyłączone. Punkty kontrolne dziennika zmian można włączyć na poziomie SparkSession przy użyciu następującej składni:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Możesz włączyć punkty kontrolne dziennika zmian w istniejącym strumieniu i utrzymywać informacje o stanie przechowywane w punkcie kontrolnym.
Ważne
Zapytania, które mają włączone tworzenie punktów kontrolnych dziennika zmian, można uruchamiać tylko w środowisku Databricks Runtime 13.3 LTS lub nowszym. Możesz wyłączyć zapisywanie stanu dziennika zmian, aby przywrócić wcześniejszy sposób działania punktów kontrolnych, ale nadal musisz uruchamiać te zapytania w środowisku Databricks Runtime 13.3 LTS lub nowszym. Aby te zmiany zostały wprowadzone, należy ponownie uruchomić zadanie.
Metryki przechowalni stanów RocksDB
Każdy operator stanu zbiera metryki związane z operacjami zarządzania stanami wykonywanymi w wystąpieniu bazy danych RocksDB, aby obserwować magazyn stanów i potencjalnie pomóc w debugowaniu spowolnienia zadań.
Metryki dla określonego wystąpienia magazynu stanowego są oznaczone ich identyfikatorem partycji i nazwą magazynu, co zapewnia ich rozdzielenie. Wszystkie inne metryki są agregowane (suma) dla każdego operatora stanu w ramach zadania, we wszystkich etapach, w których uruchomiony jest operator stanu.
Te metryki są częścią mapy w polach customMetrics
, stateOperators
, StreamingQueryProgress
. Poniżej przedstawiono przykład StreamingQueryProgress
w formacie JSON (uzyskany przy użyciu StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Szczegółowe opisy metryk są następujące:
Nazwa metryki | opis |
---|---|
rocksdbCommitWriteBatchLatency | Czas (w milisekundach), jaki zajął proces zastosowania etapowych zapisów w strukturze pamięci (WriteBatch) do natywnej bazy danych RocksDB. |
latencja zatwierdzenia i zapisu RocksDB | Czas (w milisekundach) potrzebny na opróżnienie zmian w pamięci RocksDB na lokalny dysk. |
rocksdbCommitCompactLatency | Czas (w milisekundach) trwający na kompresję podczas zatwierdzania punktu kontrolnego (opcjonalnie). |
rocksdbCommitPauseLatency | Czas (w milisekundach), który zajęło zatrzymanie wątków roboczych działających w tle, na przykład podczas kompaktowania itp., jako część zatwierdzenia punktu kontrolnego. |
Opóźnienie punktu kontrolnego zatwierdzenia w RocksDB | Czas (w ms) potrzebny na utworzenie migawki natywnej bazy danych RocksDB i zapisanie jej w katalogu lokalnym. |
rocksdbCommitFileSyncLatencyMs | Czas (w milisekundach) potrzebny na synchronizację plików związanych z natywną migawką bazy danych RocksDB do zewnętrznego przechowywania (lokalizacja punktu kontrolnego). |
rocksdbGetLatency | Średni czas (w nanosekundach) potrzebny na wykonanie bazowego wywołania natywnego RocksDB::Get . |
rocksdbPutCount | Średni czas (w nanosekundach) zajmowany przez bazowe wywołanie natywne RocksDB::Put . |
rocksdbGetCount | Liczba natywnych wywołań RocksDB::Get (nie obejmuje Gets pochodzących z WriteBatch — partii w pamięci używanej do tymczasowych zapisów). |
rocksdbPutCount | Liczba wywołań natywnych RocksDB::Put (nie obejmuje Puts funkcji WriteBatch — w partii pamięci używanej do przejściowych zapisów). |
rocksdb Łączna Liczba Bajtów Przeczytanych Przez Get | Liczba nieskompresowanych bajtów odczytanych za pośrednictwem wywołań natywnych RocksDB::Get . |
rocksdbTotalBytesWrittenByPut | Liczba nieskompresowanych bajtów napisanych za pośrednictwem wywołań natywnych RocksDB::Put . |
rocksdbReadBlockCacheHitCount | Liczba przypadków użycia natywnej pamięci podręcznej blokowej Bazy danych RocksDB w celu uniknięcia odczytywania danych z dysku lokalnego. |
rocksdbReadBlockCacheMissCount | Liczba przypadków, gdy natywna pamięć podręczna bloków RocksDB nie znalazła danych i konieczne było odczytanie ich z lokalnego dysku. |
rocksdbCałkowitaLiczbaBajtówPrzeczytywanychPodczasKompresji | Liczba bajtów odczytanych z dysku lokalnego przez natywny proces kompaktowania bazy danych RocksDB. |
rocksdb-CałkowitaLiczbaBajtówZapisanychPrzezKompakcję | Liczba bajtów zapisanych na dysku lokalnym przez natywny proces kompaktowania bazy danych RocksDB. |
rocksdbTotalCompactionLatencyMs | Czas (w milisekundach) potrzebny na kompaktowanie bazy danych RocksDB (zarówno w tle, jak i opcjonalne kompaktowanie zainicjowane podczas zatwierdzenia). |
rocksdbWriterStallLatencyMs (latencja zatrzymania zapisu w rocksdb w ms) | Czas (w milisekundach), w którym proces zapisu został zatrzymany z powodu kompaktowania w tle lub opróżniania tablic pamięci do dysku. |
rocksdbCałkowitaLiczbaPrzeczytanychBajtówPrzezIterator | Niektóre operacje z zachowaniem stanu (takie jak zarządzanie czasem w flatMapGroupsWithState lub watermarkowanie w agregacjach okiennych) wymagają odczytywania całych danych w bazie danych przez iterator. Całkowity rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora. |