Udostępnij za pośrednictwem


Punkty kontrolne przesyłania strumieniowego ze strukturą

Punkty kontrolne i dzienniki z wyprzedzeniem zapisu współpracują ze sobą w celu zapewnienia gwarancji przetwarzania obciążeń przesyłania strumieniowego ze strukturą. Punkt kontrolny śledzi informacje identyfikujące zapytanie, w tym informacje o stanie i przetworzone rekordy. Po usunięciu plików w katalogu punktu kontrolnego lub zmianie na nową lokalizację punktu kontrolnego następny przebieg zapytania rozpoczyna się od nowa.

Każde zapytanie musi mieć inną lokalizację punktu kontrolnego. Wiele zapytań nigdy nie powinno współdzielić tej samej lokalizacji.

Włączanie tworzenia punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą

Należy określić checkpointLocation opcję przed uruchomieniem zapytania przesyłania strumieniowego, jak w poniższym przykładzie:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Uwaga

Niektóre ujścia, takie jak dane wyjściowe display() w notesach i ujście memory, automatycznie generują tymczasową lokalizację punktu kontrolnego, jeśli pominięto tę opcję. Te tymczasowe lokalizacje punktów kontrolnych nie zapewniają żadnej odporności na uszkodzenia ani gwarancji spójności danych i mogą nie zostać prawidłowo wyczyszczone. Usługa Databricks zaleca zawsze określanie lokalizacji punktu kontrolnego dla tych ujść.

Odzyskiwanie po zmianach w zapytaniu przesyłania strumieniowego ze strukturą

Istnieją ograniczenia dotyczące zmian w zapytaniu przesyłanym strumieniowo między ponownymi uruchomieniami z tej samej lokalizacji punktu kontrolnego. Poniżej przedstawiono kilka zmian, które nie są dozwolone lub efekt zmiany nie jest dobrze zdefiniowany. Dla wszystkich z nich:

  • Termin dozwolony oznacza, że można wykonać określoną zmianę, ale czy semantyka jej efektu jest dobrze zdefiniowana, zależy od zapytania i zmiany.
  • Termin niedozwolony oznacza, że nie należy wykonywać określonej zmiany, ponieważ ponownie uruchomione zapytanie prawdopodobnie nie powiedzie się z nieprzewidywalnymi błędami.
  • sdf reprezentuje przesyłania strumieniowego ramki danych/zestawu danych wygenerowanego za pomocą polecenia sparkSession.readStream.

Typy zmian w zapytaniach przesyłania strumieniowego ze strukturą

  • Zmiany w liczbie lub typie (czyli innym źródle) źródeł wejściowych: nie jest to dozwolone.
  • Zmiany w parametrach źródeł wejściowych: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od źródła i zapytania. Oto kilka przykładów:
    • Dodawanie, usuwanie i modyfikowanie limitów szybkości jest dozwolone:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      na wartość

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Zmiany w subskrybowanych artykułach i plikach są zwykle niedozwolone, ponieważ wyniki są nieprzewidywalne: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Zmiany w interwale wyzwalacza: wyzwalacze można zmieniać między partiami przyrostowymi i interwałami czasu. Zobacz Zmienianie interwałów wyzwalacza między przebiegami.
  • Zmiany typu ujścia danych wyjściowych: zmiany między kilkoma konkretnymi kombinacjami ujścia są dozwolone. Należy to zweryfikować na podstawie wielkości liter. Oto kilka przykładów:
    • Ujście pliku do ujścia platformy Kafka jest dozwolone. Platforma Kafka będzie widzieć tylko nowe dane.
    • Ujście platformy Kafka do ujścia plików nie jest dozwolone.
    • Ujście platformy Kafka zostało zmienione na foreach lub na odwrót jest dozwolone.
  • Zmiany parametrów ujścia wyjściowego: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od ujścia i zapytania. Oto kilka przykładów:
    • Zmiany w katalogu wyjściowym ujścia pliku nie są dozwolone: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Zmiany w temacie wyjściowym są dozwolone: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Zmiany ujścia zdefiniowanego przez użytkownika (czyli ForeachWriter kodu) są dozwolone, ale semantyka zmiany zależy od kodu.
  • Zmiany w operacjach projekcji/filtru/mapowania: niektóre przypadki są dozwolone. Na przykład: .
    • Dodawanie/usuwanie filtrów jest dozwolone: sdf.selectExpr("a") do sdf.where(...).selectExpr("a").filter(...).
    • Zmiany w projekcjach z takim samym schematem wyjściowym są dozwolone: sdf.selectExpr("stringColumn AS json").writeStream do sdf.select(to_json(...).as("json")).writeStream.
    • Zmiany w projekcjach z innym schematem wyjściowym są warunkowo dozwolone: przejście z sdf.selectExpr("a").writeStream do sdf.selectExpr("b").writeStream jest dozwolone tylko wtedy, gdy ujście danych wyjściowych zezwala na zmianę schematu z "a" na "b".
  • Zmiany w operacjach stanowych: niektóre operacje w zapytaniach przesyłanych strumieniowo muszą obsługiwać dane stanu w celu ciągłego aktualizowania wyniku. Przesyłanie strumieniowe ze strukturą automatycznie określa dane stanu do magazynu odpornego na błędy (na przykład DBFS, Azure Blob Storage) i przywraca je po ponownym uruchomieniu. Zakłada się jednak, że schemat danych stanu pozostaje taki sam w przypadku ponownych uruchomień. Oznacza to, że wszelkie zmiany (czyli dodawanie, usuwanie lub modyfikacje schematu) w stanowych operacjach zapytania przesyłania strumieniowego są niedozwolone między ponownymi uruchomieniami. Oto lista operacji stanowych, których schemat nie powinien być zmieniany między ponownymi uruchomieniami w celu zapewnienia odzyskiwania stanu:
    • Agregacja przesyłania strumieniowego: na przykład sdf.groupBy("a").agg(...). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone.
    • Deduplikacja przesyłania strumieniowego: na przykład sdf.dropDuplicates("a"). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone.
    • łącze strumieni: na przykład sdf1.join(sdf2, ...) (tj. oba wejścia są generowane przy użyciu sparkSession.readStream). Zmiany w schemacie lub kolumnach łączących równoważnych są niedozwolone. Zmiany typu sprzężenia (zewnętrzne lub wewnętrzne) są niedozwolone. Inne zmiany w warunku sprzężenia są źle zdefiniowane.
    • Dowolna operacja stanowa: na przykład sdf.groupByKey(...).mapGroupsWithState(...) lub sdf.groupByKey(...).flatMapGroupsWithState(...). Wszelkie zmiany schematu stanu zdefiniowanego przez użytkownika i typ limitu czasu nie są dozwolone. Każda zmiana w funkcji mapowania stanu zdefiniowanego przez użytkownika jest dozwolona, ale semantyczny wpływ zmiany zależy od logiki zdefiniowanej przez użytkownika. Jeśli naprawdę chcesz obsługiwać zmiany schematu stanu, możesz jawnie kodować/dekodować złożone struktury danych stanu do bajtów przy użyciu schematu kodowania/dekodowania, który obsługuje migrację schematu. Jeśli na przykład zapiszesz stan jako bajty zakodowane w formacie Avro, możesz zmienić schemat Avro-state-między ponownym uruchomieniem zapytania, ponieważ spowoduje to przywrócenie stanu binarnego.