Odczytywanie informacji o stanie strumieniowania ustrukturyzowanego
W środowisku Databricks Runtime 14.3 LTS lub nowszym na obliczeniach skonfigurowanych z dedykowanym trybem dostępu lub bez izolacji można użyć operacji DataFrame lub funkcji tabelarycznych SQL do wykonywania zapytań dotyczących danych i metadanych stanu przesyłania strumieniowego ze strukturą. Za pomocą tych funkcji można obserwować informacje o stanie przetwarzania zapytań stanowych w Strukturalnym Przesyłaniu Strumieniowym, co może być przydatne do monitorowania i debugowania.
Aby wysyłać zapytania o dane stanu lub metadane, musisz mieć dostęp do odczytu do ścieżki punktu kontrolnego zapytania strumieniowego. Funkcje opisane w tym artykule zapewniają dostęp do danych stanu i metadanych wyłącznie do odczytu. Można tylko korzystać z semantyki odczytu wsadowego do zapytań dotyczących informacji o stanie.
Uwaga
Nie można wykonywać zapytań dotyczących danych o stanie dla potoków DLT, tabel przesyłania strumieniowego ani zmaterializowanych widoków. Nie można wykonywać zapytań dotyczących informacji o stanie przy użyciu bezserwerowych zasobów obliczeniowych lub obliczeń skonfigurowanych w trybie dostępu standardowego.
Odczyt magazynu stanów w Structured Streaming
Możesz odczytać informacje o sklepie stanów dotyczące zapytań Strukturalnego Przesyłania Strumieniowego wykonywanych w dowolnej obsługiwanej wersji Databricks Runtime. Użyj następującej składni:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Obsługiwane są następujące opcjonalne konfiguracje:
Opcja | Typ | Domyślna wartość | opis |
---|---|---|---|
batchId |
Długi | najnowszy identyfikator serii | Reprezentuje grupę docelową do odczytu. Określ tę opcję, aby wysyłać zapytania o informacje o stanie dla wcześniejszego stanu zapytania. Partia musi zostać zatwierdzona, ale nie została jeszcze wyczyszczona. |
operatorId |
Długi | 0 | Reprezentuje operatora docelowego do odczytu. Ta opcja jest używana, gdy zapytanie używa wielu operatorów stanowych. |
storeName |
String | "DEFAULT" | Reprezentuje nazwę magazynu stanów, z którego można odczytać. Ta opcja jest używana, gdy operator stanowy używa wielu wystąpień magazynu stanów. Należy określić storeName lub joinSide dla sprzężenia strumieniowo-parowego, ale nie obu. |
joinSide |
Ciąg znaków ("lewy" lub "prawy") | Reprezentuje stronę, z której należy odczytywać. Ta opcja jest używana, gdy użytkownicy chcą odczytać stan z połączenia strumień-strumień. |
Zwrócone dane mają następujący schemat:
Kolumna | Typ | opis |
---|---|---|
key |
Struktura (dalszy typ pochodzący z klucza stanu) | Klucz rekordu operatora stanowego w punkcie kontrolnym stanu. |
value |
Struktura (dalszy typ pochodzący z wartości stanu) | Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu. |
partition_id |
Integer | Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego. |
Odczyt metadanych stanu Structured Streaming
Ważne
Aby rejestrować metadane stanu, należy uruchamiać zapytania przesyłane strumieniowo w środowisku Databricks Runtime 14.2 lub nowszym. Pliki metadanych stanu nie przerywają zgodności z poprzednimi wersjami. Jeśli zdecydujesz się uruchomić zapytanie przesyłania strumieniowego w środowisku Databricks Runtime 14.1 lub nowszym, istniejące pliki metadanych stanu są ignorowane i nie są zapisywane żadne nowe pliki metadanych stanu.
Możesz odczytać informacje o metadanych stanu dla uporządkowanych zapytań przesyłania strumieniowego uruchamianych w środowisku Databricks Runtime 14.2 lub nowszym. Użyj następującej składni:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Zwrócone dane mają następujący schemat:
Kolumna | Typ | opis |
---|---|---|
operatorId |
Integer | Całkowity identyfikator operatora stanowego przesyłania strumieniowego. |
operatorName |
Integer | Nazwa stanowego operatora przesyłania strumieniowego. |
stateStoreName |
String | Nazwa magazynu stanowego operatora. |
numPartitions |
Integer | Liczba partycji repozytorium stanu. |
minBatchId |
Długi | Minimalny identyfikator partii dostępny do wykonywania zapytań o stan. |
maxBatchId |
Długi | Maksymalny identyfikator partii dostępny dla stanu wykonywania zapytań. |
Uwaga
Wartości identyfikatorów partii dostarczone przez minBatchId
i maxBatchId
odzwierciedlają stan w momencie zapisu punktu kontrolnego. Stare partie są automatycznie czyszczone w wyniku mikroprzetwarzania partii, więc podana tutaj wartość nie jest gwarantowana, że będzie nadal dostępna.