Freigeben über


Was ist asynchrone Statusverfolgung?

Wichtig

Dieses Feature befindet sich in der Public Preview.

Durch die asynchrone Statusverfolgung können Pipelines für strukturiertes Streaming den Status asynchron und parallel zur eigentlichen Datenverarbeitung innerhalb eines Mikrobatchs mit Prüfpunkten versehen. Dadurch verringert sich die Latenz im Zusammenhang mit der Verwaltung von offsetLog und commitLog.

Asynchrone Statusverfolgung

Hinweis

Die asynchrone Statusnachverfolgung funktioniert nicht mit Triggern vom Typ Trigger.once oder Trigger.availableNow. Wenn Sie versuchen, das Feature mit diesen Triggern zu aktivieren, tritt ein Abfragefehler auf.

Wie verringert die asynchrone Statusverfolgung die Latenz?

Strukturiertes Streaming basiert auf der Speicherung und Verwaltung von Offsets als Statusindikatoren für die Abfrageverarbeitung. Die Offsetverwaltung wirkt sich direkt auf die Verarbeitungslatenz aus, da keine Datenverarbeitung erfolgen kann, solange diese Vorgänge nicht abgeschlossen sind. Durch die asynchrone Statusverfolgung können Pipelines für strukturiertes Streaming den Status mit Prüfpunkten versehen, ohne durch diese Offsetverwaltungsvorgänge beeinflusst zu werden.

Wann sollten Sie die Häufigkeit von Prüfpunkten konfigurieren?

Benutzer können festlegen, wie häufig der Status mit Prüfpunkten versehen werden soll. Die Standardeinstellungen für die Prüfpunkthäufigkeit bieten einen guten Durchsatz für die meisten Abfragen. Das Konfigurieren der Häufigkeit ist hilfreich für Szenarien, in denen Offsetverwaltungsvorgänge mit einer Rate auftreten, die die Verarbeitungskapazität übersteigt, was zu einem immer größer werdenden Rückstand von Offsetverwaltungsvorgängen führt. Um diesem wachsenden Rückstand entgegenzuwirken, wird die Datenverarbeitung blockiert oder verlangsamt, wodurch im Wesentlichen das Verarbeitungsverhalten zurückgesetzt wird und die Vorteile der asynchronen Statusverfolgung nicht mehr gegeben sind.

Hinweis

Die Zeit für Wiederherstellung nach einem Fehler nimmt mit der Erhöhung der Prüfpunktintervallzeit zu. Im Falle eines Fehlers muss eine Pipeline alle Daten vor dem vorherigen erfolgreichen Prüfpunkt erneut verarbeiten. Benutzer können diesen Kompromiss zwischen der geringeren Latenz während der regulären Verarbeitung und der Wiederherstellungszeit im Fehlerfall abwägen.

Welche Konfigurationen gehören zur asynchronen Statusverfolgung?

Option Wert Standard BESCHREIBUNG
asyncProgressTrackingEnabled TRUE/FALSE false Dient zum Aktivieren oder Deaktivieren der asynchronen Statusverfolgung.
asyncProgressTrackingCheckpointIntervalMs Millisekunden 1000 Das Intervall, in dem Offsets und Abschlusscommits committet werden.

Wie können Benutzer die asynchrone Statusverfolgung aktivieren?

Benutzer können Code wie den folgenden verwenden, um dieses Feature zu aktivieren:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Deaktivieren der asynchronen Statusverfolgung

Wenn die asynchrone Statusverfolgung aktiviert ist, prüft das Framework den Fortschritt nicht für jeden Batch. Verarbeiten Sie vor dem Deaktivieren der asynchronen Statusverfolgung mindestens zwei Mikrobatches mit den folgenden Einstellungen, um dieses Problem zu beheben:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Beenden Sie die Abfrage, nachdem für mindestens zwei Mikrobatches die Verarbeitung abgeschlossen wurde. Jetzt können Sie die asynchrone Statusverfolgung sicher deaktivieren und die Abfrage neu starten.

Wenn Sie die asynchrone Statusverfolgung deaktiviert haben, ohne diesen Schritt auszuführen, tritt möglicherweise der folgende Fehler auf:

java.lang.IllegalStateException: batch x doesn't exist

In den Treiberprotokollen wird möglicherweise der folgende Fehler angezeigt:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Wenn Sie die Anweisungen in diesem Abschnitt zum Deaktivieren der asynchronen Statusverfolgung befolgen, können Sie diese Fehler beheben und Ihre Streamingworkload reparieren.

Einschränkungen bei der asynchronen Statusverfolgung

Für diese Funktion gelten folgende Einschränkungen:

  • Wenn Kafka als Senke verwendet wird, wird die asynchrone Statusverfolgung nur in zustandslosen Pipelines unterstützt.
  • Eine exakt einmalige End-to-End-Verarbeitung ist bei der asynchronen Statusverfolgung nicht garantiert, da sich Offsetbereiche für den Batch im Falle eines Fehlers ändern können. Bei einigen Senken (z. B. Kafka) wird niemals eine exakt einmalige Verarbeitung garantiert.