Udostępnij za pośrednictwem


Odczytywanie informacji o stanie strumieniowania ustrukturyzowanego

W środowisku Databricks Runtime 14.3 LTS lub nowszym na obliczeniach skonfigurowanych z dedykowanym trybem dostępu lub bez izolacji można użyć operacji DataFrame lub funkcji tabelarycznych SQL do wykonywania zapytań dotyczących danych i metadanych stanu przesyłania strumieniowego ze strukturą. Za pomocą tych funkcji można obserwować informacje o stanie przetwarzania zapytań stanowych w Strukturalnym Przesyłaniu Strumieniowym, co może być przydatne do monitorowania i debugowania.

Aby wysyłać zapytania o dane stanu lub metadane, musisz mieć dostęp do odczytu do ścieżki punktu kontrolnego zapytania strumieniowego. Funkcje opisane w tym artykule zapewniają dostęp do danych stanu i metadanych wyłącznie do odczytu. Można tylko korzystać z semantyki odczytu wsadowego do zapytań dotyczących informacji o stanie.

Uwaga

Nie można wykonywać zapytań dotyczących danych o stanie dla potoków DLT, tabel przesyłania strumieniowego ani zmaterializowanych widoków. Nie można wykonywać zapytań dotyczących informacji o stanie przy użyciu bezserwerowych zasobów obliczeniowych lub obliczeń skonfigurowanych w trybie dostępu standardowego.

Odczyt magazynu stanów w Structured Streaming

Możesz odczytać informacje o sklepie stanów dotyczące zapytań Strukturalnego Przesyłania Strumieniowego wykonywanych w dowolnej obsługiwanej wersji Databricks Runtime. Użyj następującej składni:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Obsługiwane są następujące opcjonalne konfiguracje:

Opcja Typ Domyślna wartość opis
batchId Długi najnowszy identyfikator serii Reprezentuje grupę docelową do odczytu. Określ tę opcję, aby wysyłać zapytania o informacje o stanie dla wcześniejszego stanu zapytania. Partia musi zostać zatwierdzona, ale nie została jeszcze wyczyszczona.
operatorId Długi 0 Reprezentuje operatora docelowego do odczytu. Ta opcja jest używana, gdy zapytanie używa wielu operatorów stanowych.
storeName String "DEFAULT" Reprezentuje nazwę magazynu stanów, z którego można odczytać. Ta opcja jest używana, gdy operator stanowy używa wielu wystąpień magazynu stanów. Należy określić storeName lub joinSide dla sprzężenia strumieniowo-parowego, ale nie obu.
joinSide Ciąg znaków ("lewy" lub "prawy") Reprezentuje stronę, z której należy odczytywać. Ta opcja jest używana, gdy użytkownicy chcą odczytać stan z połączenia strumień-strumień.

Zwrócone dane mają następujący schemat:

Kolumna Typ opis
key Struktura (dalszy typ pochodzący z klucza stanu) Klucz rekordu operatora stanowego w punkcie kontrolnym stanu.
value Struktura (dalszy typ pochodzący z wartości stanu) Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu.
partition_id Integer Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego.

Odczyt metadanych stanu Structured Streaming

Ważne

Aby rejestrować metadane stanu, należy uruchamiać zapytania przesyłane strumieniowo w środowisku Databricks Runtime 14.2 lub nowszym. Pliki metadanych stanu nie przerywają zgodności z poprzednimi wersjami. Jeśli zdecydujesz się uruchomić zapytanie przesyłania strumieniowego w środowisku Databricks Runtime 14.1 lub nowszym, istniejące pliki metadanych stanu są ignorowane i nie są zapisywane żadne nowe pliki metadanych stanu.

Możesz odczytać informacje o metadanych stanu dla uporządkowanych zapytań przesyłania strumieniowego uruchamianych w środowisku Databricks Runtime 14.2 lub nowszym. Użyj następującej składni:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Zwrócone dane mają następujący schemat:

Kolumna Typ opis
operatorId Integer Całkowity identyfikator operatora stanowego przesyłania strumieniowego.
operatorName Integer Nazwa stanowego operatora przesyłania strumieniowego.
stateStoreName String Nazwa magazynu stanowego operatora.
numPartitions Integer Liczba partycji repozytorium stanu.
minBatchId Długi Minimalny identyfikator partii dostępny do wykonywania zapytań o stan.
maxBatchId Długi Maksymalny identyfikator partii dostępny dla stanu wykonywania zapytań.

Uwaga

Wartości identyfikatorów partii dostarczone przez minBatchId i maxBatchId odzwierciedlają stan w momencie zapisu punktu kontrolnego. Stare partie są automatycznie czyszczone w wyniku mikroprzetwarzania partii, więc podana tutaj wartość nie jest gwarantowana, że będzie nadal dostępna.