Punkty kontrolne przesyłania strumieniowego ze strukturą
Punkty kontrolne i dzienniki z wyprzedzeniem zapisu współpracują ze sobą w celu zapewnienia gwarancji przetwarzania obciążeń przesyłania strumieniowego ze strukturą. Punkt kontrolny śledzi informacje identyfikujące zapytanie, w tym informacje o stanie i przetworzone rekordy. Po usunięciu plików w katalogu punktu kontrolnego lub zmianie na nową lokalizację punktu kontrolnego następny przebieg zapytania rozpoczyna się od nowa.
Każde zapytanie musi mieć inną lokalizację punktu kontrolnego. Wiele zapytań nigdy nie powinno współdzielić tej samej lokalizacji.
Włączanie tworzenia punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą
Należy określić checkpointLocation
opcję przed uruchomieniem zapytania przesyłania strumieniowego, jak w poniższym przykładzie:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Uwaga
Niektóre ujścia, takie jak dane wyjściowe display()
w notesach i ujście memory
, automatycznie generują tymczasową lokalizację punktu kontrolnego, jeśli pominięto tę opcję. Te tymczasowe lokalizacje punktów kontrolnych nie zapewniają żadnej odporności na uszkodzenia ani gwarancji spójności danych i mogą nie zostać prawidłowo wyczyszczone. Usługa Databricks zaleca zawsze określanie lokalizacji punktu kontrolnego dla tych ujść.
Odzyskiwanie po zmianach w zapytaniu przesyłania strumieniowego ze strukturą
Istnieją ograniczenia dotyczące zmian w zapytaniu przesyłanym strumieniowo między ponownymi uruchomieniami z tej samej lokalizacji punktu kontrolnego. Poniżej przedstawiono kilka zmian, które nie są dozwolone lub efekt zmiany nie jest dobrze zdefiniowany. Dla wszystkich z nich:
- Termin dozwolony oznacza, że można wykonać określoną zmianę, ale czy semantyka jej efektu jest dobrze zdefiniowana, zależy od zapytania i zmiany.
- Termin niedozwolony oznacza, że nie należy wykonywać określonej zmiany, ponieważ ponownie uruchomione zapytanie prawdopodobnie nie powiedzie się z nieprzewidywalnymi błędami.
-
sdf
reprezentuje przesyłania strumieniowego ramki danych/zestawu danych wygenerowanego za pomocą poleceniasparkSession.readStream
.
Typy zmian w zapytaniach przesyłania strumieniowego ze strukturą
- Zmiany w liczbie lub typie (czyli innym źródle) źródeł wejściowych: nie jest to dozwolone.
-
Zmiany w parametrach źródeł wejściowych: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od źródła i zapytania. Oto kilka przykładów:
Dodawanie, usuwanie i modyfikowanie limitów szybkości jest dozwolone:
spark.readStream.format("kafka").option("subscribe", "article")
na wartość
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Zmiany w subskrybowanych artykułach i plikach są zwykle niedozwolone, ponieważ wyniki są nieprzewidywalne:
spark.readStream.format("kafka").option("subscribe", "article")
spark.readStream.format("kafka").option("subscribe", "newarticle")
- Zmiany w interwale wyzwalacza: wyzwalacze można zmieniać między partiami przyrostowymi i interwałami czasu. Zobacz Zmienianie interwałów wyzwalacza między przebiegami.
-
Zmiany typu ujścia danych wyjściowych: zmiany między kilkoma konkretnymi kombinacjami ujścia są dozwolone. Należy to zweryfikować na podstawie wielkości liter. Oto kilka przykładów:
- Ujście pliku do ujścia platformy Kafka jest dozwolone. Platforma Kafka będzie widzieć tylko nowe dane.
- Ujście platformy Kafka do ujścia plików nie jest dozwolone.
- Ujście platformy Kafka zostało zmienione na foreach lub na odwrót jest dozwolone.
-
Zmiany parametrów ujścia wyjściowego: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od ujścia i zapytania. Oto kilka przykładów:
- Zmiany w katalogu wyjściowym ujścia pliku nie są dozwolone:
sdf.writeStream.format("parquet").option("path", "/somePath")
sdf.writeStream.format("parquet").option("path", "/anotherPath")
- Zmiany w temacie wyjściowym są dozwolone:
sdf.writeStream.format("kafka").option("topic", "topic1")
sdf.writeStream.format("kafka").option("topic", "topic2")
- Zmiany ujścia zdefiniowanego przez użytkownika (czyli
ForeachWriter
kodu) są dozwolone, ale semantyka zmiany zależy od kodu.
- Zmiany w katalogu wyjściowym ujścia pliku nie są dozwolone:
-
Zmiany w operacjach projekcji/filtru/mapowania: niektóre przypadki są dozwolone. Na przykład: .
- Dodawanie/usuwanie filtrów jest dozwolone:
sdf.selectExpr("a")
dosdf.where(...).selectExpr("a").filter(...)
. - Zmiany w projekcjach z takim samym schematem wyjściowym są dozwolone:
sdf.selectExpr("stringColumn AS json").writeStream
dosdf.select(to_json(...).as("json")).writeStream
. - Zmiany w projekcjach z innym schematem wyjściowym są warunkowo dozwolone: przejście z
sdf.selectExpr("a").writeStream
dosdf.selectExpr("b").writeStream
jest dozwolone tylko wtedy, gdy ujście danych wyjściowych zezwala na zmianę schematu z"a"
na"b"
.
- Dodawanie/usuwanie filtrów jest dozwolone:
-
Zmiany w operacjach stanowych: niektóre operacje w zapytaniach przesyłanych strumieniowo muszą obsługiwać dane stanu w celu ciągłego aktualizowania wyniku. Przesyłanie strumieniowe ze strukturą automatycznie określa dane stanu do magazynu odpornego na błędy (na przykład DBFS, Azure Blob Storage) i przywraca je po ponownym uruchomieniu. Zakłada się jednak, że schemat danych stanu pozostaje taki sam w przypadku ponownych uruchomień. Oznacza to, że wszelkie zmiany (czyli dodawanie, usuwanie lub modyfikacje schematu) w stanowych operacjach zapytania przesyłania strumieniowego są niedozwolone między ponownymi uruchomieniami. Oto lista operacji stanowych, których schemat nie powinien być zmieniany między ponownymi uruchomieniami w celu zapewnienia odzyskiwania stanu:
-
Agregacja przesyłania strumieniowego: na przykład
sdf.groupBy("a").agg(...)
. Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone. -
Deduplikacja przesyłania strumieniowego: na przykład
sdf.dropDuplicates("a")
. Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone. -
łącze strumieni: na przykład
sdf1.join(sdf2, ...)
(tj. oba wejścia są generowane przy użyciusparkSession.readStream
). Zmiany w schemacie lub kolumnach łączących równoważnych są niedozwolone. Zmiany typu sprzężenia (zewnętrzne lub wewnętrzne) są niedozwolone. Inne zmiany w warunku sprzężenia są źle zdefiniowane. -
Dowolna operacja stanowa: na przykład
sdf.groupByKey(...).mapGroupsWithState(...)
lubsdf.groupByKey(...).flatMapGroupsWithState(...)
. Wszelkie zmiany schematu stanu zdefiniowanego przez użytkownika i typ limitu czasu nie są dozwolone. Każda zmiana w funkcji mapowania stanu zdefiniowanego przez użytkownika jest dozwolona, ale semantyczny wpływ zmiany zależy od logiki zdefiniowanej przez użytkownika. Jeśli naprawdę chcesz obsługiwać zmiany schematu stanu, możesz jawnie kodować/dekodować złożone struktury danych stanu do bajtów przy użyciu schematu kodowania/dekodowania, który obsługuje migrację schematu. Jeśli na przykład zapiszesz stan jako bajty zakodowane w formacie Avro, możesz zmienić schemat Avro-state-między ponownym uruchomieniem zapytania, ponieważ spowoduje to przywrócenie stanu binarnego.
-
Agregacja przesyłania strumieniowego: na przykład