Dela via


Konfigurera RocksDB-tillståndslager i Azure Databricks

Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration i SparkSession innan du startar strömningsfrågan.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Du kan aktivera RocksDB på DLT-pipelines. Se Optimera pipeline-konfiguration för tillståndsbaserad bearbetning.

Aktivera kontrollpunkter för ändringsloggar

I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar för att minska kontrollpunktens varaktighet och end-to-end-latens för strukturerade strömmande arbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming.

Traditionellt tar RocksDB State Store ögonblicksbilder och laddar upp datafiler under processen för kontrollpunkting. För att undvika den här kostnaden skriver ändringsloggkontrollpunkter endast poster som har ändrats sedan den senaste kontrollpunkten till varaktig lagring."

Kontrollpunkter för ändringsloggar är inaktiverade som standard. Du kan aktivera kontrollpunkter för ändringsloggar på SparkSession-nivån med hjälp av följande syntax:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Du kan aktivera kontrollpunkter för ändringsloggar i en befintlig dataström och bibehålla tillståndsinformationen som lagras i kontrollpunkten.

Viktigt!

Frågor som har aktiverat kontrollpunkter för ändringsloggar kan bara köras på Databricks Runtime 13.3 LTS och senare. Du kan inaktivera kontrollpunkter för ändringsloggar för att återgå till äldre kontrollpunktsbeteende, men du måste fortsätta att köra dessa frågor på Databricks Runtime 13.3 LTS eller senare. Du måste starta om jobbet för att ändringarna ska kunna genomföras.

Metrik för RocksDB-tillståndslager

Varje operatör för tillståndshantering samlar in metrik relaterade till de tillståndshanteringsåtgärder som utförs på dess RocksDB-instans för att övervaka tillståndslagret och potentiellt hjälpa till med felsökning vid långsam jobbprestanda.

Mått för en specifik instans av tillståndslagret är märkta med deras partitions-ID och lagringsnamn för att säkerställa att de förblir åtskilda. Alla andra mått aggregeras (som summa) per tillståndsoperator i jobbet över alla uppgifter där tillståndsoperatorn körs.

Dessa mått är en del av kartan customMetrics i fälten stateOperators i StreamingQueryProgress. Följande är ett exempel på StreamingQueryProgress i JSON-format (som erhållits med StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Detaljerade beskrivningar av måtten är följande:

Måttnamn beskrivning
rocksdbCommitWriteBatchLatency Tiden (i millis) som togs för att tillämpa stegvis skrivningar i en in-memory-struktur (WriteBatch) till native RocksDB.
rocksdbCommitFlushLatency Tid (i millis) för att spola ändringar i RockDB:s minne till lokal disk.
rocksdbCommitCompactLatency Tiden (i millisekunder) som krävdes för komprimering (valfritt) under genomförandet av kontrollpunkten.
rocksdbCommitPauseLatency Tiden (i millisekunder) det tog att stoppa bakgrundsarbetartrådarna (för kompaktöring med mera) som en del av kontrollpunktsåtgärden.
rocksdbCommitCheckpointLatency Tiden (i millis) det tog att ta en ögonblicksbild av det inbyggda RocksDB och skriva den till en lokal katalog.
rocksdbCommitFileSyncLatencyMs Tiden (i millisekunder) som det tog att synkronisera de inbyggda RocksDB-snapshotrelaterade filerna till en extern lagringsplats (kontrollpunktsplats).
rocksdbGetLatency Genomsnittlig tid (i nanos) som det ursprungliga RocksDB::Get-anropet tog.
rocksdbPutCount Genomsnittlig tid (i nanos) som det ursprungliga RocksDB::Put-anropet tog.
rocksdbGetCount Antal interna RocksDB::Get anrop (inkluderar Gets inte från WriteBatch – i minnesbatch som används för mellanlagring av skrivningar).
rocksdbPutCount Antal inhemska RocksDB::Put anrop (inkluderar inte Puts WriteBatch – en minnesbatch som används för mellanlagring av skrivningar).
rocksdbTotalBytesReadByGet Antal okomprimerade byte som läses via interna RocksDB::Get anrop.
rocksdbTotalBytesWrittenByPut (Totala Bytes Skrivna av Put) Antal okomprimerade byte som skrivits via interna RocksDB::Put anrop.
rocksdbReadBlockCacheHitCount Antal gånger som det interna RocksDB-blockcacheminnet används för att undvika att läsa data från en lokal disk.
rocksdbReadBlockCacheMissCount Antal gånger det inbyggda RocksDB-blockcacheminnet missade och krävde läsning av data från den lokala disken.
rocksdbTotaltBitarLästaVidKompaktering Antal byte som lästs från den lokala disken av den interna RocksDB-komprimeringsprocessen.
rocksdbTotalBytesWrittenByCompaction Antal byte som skrivits till den lokala disken av den interna RocksDB-komprimeringsprocessen.
rocksdbTotalCompactionLatencyMs Tiden (i millis) tog för RocksDB-komprimeringar (både bakgrunds- och den valfria komprimering som initierades under commiten).
rocksdbWriterStallLatencyMs Tid (i millisekunder) skrivprocessen har avstannat på grund av en bakgrundskomprimering eller tömning av "memtables" till disk.
rocksdbTotalBytesReadThroughIterator Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i flatMapGroupsWithState eller vattenstämpling i fönsteraggregeringar) kräver att du läser hela data i DB via iteratorn. Den totala storleken på okomprimerade data som lästs med iteratorn.