Delen via


Controlepunten voor gestructureerd streamen

Controlepunten en write-ahead-logboeken werken samen om verwerkingsgaranties te bieden voor structured streaming-workloads. Het controlepunt houdt de informatie bij die de query identificeert, inclusief statusinformatie en verwerkte records. Wanneer u de bestanden in een controlepuntmap verwijdert of naar een nieuwe controlepuntlocatie overgaat, wordt de volgende uitvoering van de query vernieuwd.

Elke query moet een andere controlepuntlocatie hebben. Meerdere query's mogen nooit dezelfde locatie delen.

Controlepunten inschakelen voor structured streaming-query's

U moet de checkpointLocation optie opgeven voordat u een streamingquery uitvoert, zoals in het volgende voorbeeld:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Notitie

Sommige sinks, zoals de uitvoer voor display() in notebooks en de memory sink, genereren automatisch een tijdelijke controlepuntlocatie als u deze optie weglaat. Deze tijdelijke controlepuntlocaties garanderen geen fouttolerantie of garanties voor gegevensconsistentie en worden mogelijk niet correct opgeschoond. Databricks raadt aan altijd een controlepuntlocatie voor deze sinks op te geven.

Herstellen na wijzigingen in een Structured Streaming-query

Er gelden beperkingen voor de wijzigingen in een streamingquery tussen opnieuw opstarten vanaf dezelfde controlepuntlocatie. Hier volgen enkele wijzigingen die niet zijn toegestaan of het effect van de wijziging is niet goed gedefinieerd. Voor al deze:

  • De toegestane term betekent dat u de opgegeven wijziging kunt uitvoeren, maar of de semantiek van het effect ervan goed is gedefinieerd, is afhankelijk van de query en de wijziging.
  • De term die niet is toegestaan , betekent dat u de opgegeven wijziging niet moet uitvoeren omdat de opnieuw gestarte query waarschijnlijk mislukt met onvoorspelbare fouten.
  • sdf vertegenwoordigt een streaming DataFrame/Gegevensset die is gegenereerd met sparkSession.readStream.

Typen wijzigingen in Structured Streaming-query's

  • Wijzigingen in het aantal of het type (dat wil gezegd een andere bron) van invoerbronnen: dit is niet toegestaan.
  • Wijzigingen in de parameters van invoerbronnen: Of dit is toegestaan en of de semantiek van de wijziging goed is gedefinieerd, is afhankelijk van de bron en de query. Enkele voorbeelden:
    • Toevoeging, verwijdering en wijziging van frequentielimieten is toegestaan:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Wijzigingen in geabonneerde artikelen en bestanden zijn over het algemeen niet toegestaan omdat de resultaten onvoorspelbaar zijn: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Wijzigingen in het triggerinterval: u kunt triggers tussen incrementele batches en tijdsintervallen wijzigen. Zie Triggerintervallen tussen uitvoeringen wijzigen.
  • Wijzigingen in het type uitvoersink: wijzigingen tussen enkele specifieke combinaties van sinks zijn toegestaan. Dit moet per geval worden gecontroleerd. Enkele voorbeelden:
    • Bestandssink naar Kafka-sink is toegestaan. Kafka ziet alleen de nieuwe gegevens.
    • Kafka-sink naar bestandssink is niet toegestaan.
    • Kafka-sink is gewijzigd in foreach of omgekeerd is toegestaan.
  • Wijzigingen in de parameters van de uitvoersink: Of dit is toegestaan en of de semantiek van de wijziging goed is gedefinieerd, is afhankelijk van de sink en de query. Enkele voorbeelden:
    • Wijzigingen in de uitvoermap van een bestandssink zijn niet toegestaan: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Wijzigingen in het uitvoeronderwerp zijn toegestaan: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Wijzigingen in de door de gebruiker gedefinieerde foreach-sink (dat wil gezegd de ForeachWriter code) zijn toegestaan, maar de semantiek van de wijziging is afhankelijk van de code.
  • Wijzigingen in projectie-/filter-/kaartachtige bewerkingen: sommige gevallen zijn toegestaan. Bijvoorbeeld:
    • Het toevoegen/verwijderen van filters is toegestaan: sdf.selectExpr("a") tot sdf.where(...).selectExpr("a").filter(...).
    • Wijzigingen in projecties met hetzelfde uitvoerschema zijn toegestaan: sdf.selectExpr("stringColumn AS json").writeStream sdf.select(to_json(...).as("json")).writeStream
    • Wijzigingen in projecties met een ander uitvoerschema zijn voorwaardelijk toegestaan: sdf.selectExpr("a").writeStream sdf.selectExpr("b").writeStream dit is alleen toegestaan als de uitvoersink de schemawijziging toestaat van ."b""a"
  • Wijzigingen in stateful bewerkingen: sommige bewerkingen in streamingquery's moeten statusgegevens onderhouden om het resultaat continu bij te werken. Structured Streaming controleert automatisch de statusgegevens naar fouttolerante opslag (bijvoorbeeld DBFS, Azure Blob Storage) en herstelt deze na het opnieuw opstarten. Hierbij wordt echter ervan uitgegaan dat het schema van de statusgegevens hetzelfde blijft bij het opnieuw opstarten. Dit betekent dat wijzigingen (dat wil zeggen toevoegingen, verwijderingen of schemawijzigingen) aan de stateful bewerkingen van een streamingquery niet zijn toegestaan tussen opnieuw opstarten. Hier volgt de lijst met stateful bewerkingen waarvan het schema niet moet worden gewijzigd tussen opnieuw opstarten om ervoor te zorgen dat statusherstel wordt gegarandeerd:
    • Streamingaggregatie: bijvoorbeeld sdf.groupBy("a").agg(...). Eventuele wijzigingen in het aantal of het type groeperingssleutels of aggregaties zijn niet toegestaan.
    • Streamingontdubbeling: bijvoorbeeld sdf.dropDuplicates("a"). Eventuele wijzigingen in het aantal of het type groeperingssleutels of aggregaties zijn niet toegestaan.
    • Stream-stream join: bijvoorbeeld (dus sdf1.join(sdf2, ...) beide invoer wordt gegenereerd met sparkSession.readStream). Wijzigingen in het schema of kolommen voor equi-samenvoegen zijn niet toegestaan. Wijzigingen in jointype (outer of inner) zijn niet toegestaan. Andere wijzigingen in de joinvoorwaarde zijn slecht gedefinieerd.
    • Willekeurige stateful bewerking: bijvoorbeeldsdf.groupByKey(...).mapGroupsWithState(...).sdf.groupByKey(...).flatMapGroupsWithState(...) Elke wijziging in het schema van de door de gebruiker gedefinieerde status en het type time-out is niet toegestaan. Elke wijziging binnen de door de gebruiker gedefinieerde statustoewijzingsfunctie is toegestaan, maar het semantische effect van de wijziging is afhankelijk van de door de gebruiker gedefinieerde logica. Als u echt statusschemawijzigingen wilt ondersteunen, kunt u uw complexe statusgegevensstructuren expliciet coderen/decoderen in bytes met behulp van een coderings-/decoderingsschema dat schemamigratie ondersteunt. Als u bijvoorbeeld uw status opslaat als Avro-gecodeerde bytes, kunt u het Avro-statusschema wijzigen tussen het opnieuw opstarten van de query, omdat hiermee de binaire status wordt hersteld.