¿Qué es el seguimiento de progreso asincrónico?
Importante
Esta característica está en versión preliminar pública.
El seguimiento de progreso asincrónico permite a las canalizaciones de Structured Streaming controlar el progreso de forma asincrónica y en paralelo al procesamiento de datos real dentro de un microproceso, lo que reduce la latencia asociada al mantenimiento de offsetLog
y commitLog
.
Nota:
El seguimiento asíncrono del progreso no funciona con desencadenadores Trigger.once
o Trigger.availableNow
. Si se intenta habilitar esta característica con estos desencadenadores, se produce un error de consulta.
¿Cómo funciona el seguimiento de progreso asincrónico para reducir la latencia?
Structured Streaming se basa en la conservación y administración de desplazamientos como indicadores de progreso para el procesamiento de consultas. La operación de administración de desplazamiento afecta directamente a la latencia de procesamiento, ya que no se puede producir ningún procesamiento de datos hasta que se completen estas operaciones. El seguimiento de progreso asincrónico permite que las canalizaciones de Structured Streaming avancen en el progreso del punto de comprobación sin verse afectadas por estas operaciones de administración de desplazamiento.
¿Cuándo debe configurar la frecuencia del punto de comprobación?
Los usuarios pueden configurar la frecuencia con la que se controla el progreso. La configuración predeterminada para la frecuencia del punto de comprobación proporciona un buen rendimiento para la mayoría de las consultas. La configuración de la frecuencia es útil para escenarios en los que las operaciones de administración de desplazamiento se producen a una velocidad superior a la que se pueden procesar, lo que crea un trabajo pendiente cada vez mayor de las operaciones de administración de desplazamiento. Para poner fin a este trabajo pendiente creciente, el procesamiento de datos se bloquea o se ralentiza, lo que básicamente revierte el comportamiento de procesamiento para eliminar las ventajas del seguimiento de progreso asincrónico.
Nota:
El tiempo de recuperación de errores aumenta con el aumento en el tiempo del intervalo de puntos de comprobación. En caso de error, una canalización tiene que volver a procesar todos los datos antes del punto de comprobación correcto anterior. Los usuarios pueden considerar este equilibrio entre una latencia menor durante el procesamiento normal y el tiempo de recuperación en caso de error.
¿Qué configuraciones están asociadas con el seguimiento de progreso asincrónico?
Opción | Value | Valor predeterminado | Descripción |
---|---|---|---|
asyncProgressTrackingEnabled | true/false | false | habilitar o deshabilitar el seguimiento de progreso asincrónico |
asyncProgressTrackingCheckpointIntervalMs | milisegundos | 1000 | el intervalo en el que confirmamos los desplazamientos y confirmaciones de finalización |
¿Cómo pueden los usuarios habilitar el seguimiento de progreso asincrónico?
Los usuarios pueden usar código similar al código siguiente para habilitar esta característica:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Desactivación del seguimiento del progreso asincrónico
Cuando se habilita el seguimiento de progreso asincrónico, el marco no controla el progreso de cada lote. Para solucionarlo, antes de deshabilitar el seguimiento del progreso asincrónico, procese al menos dos microprocesos con la siguiente configuración:
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
No detenga la consulta hasta que haya terminado el procesamiento de al menos dos microlotes. Ahora puede deshabilitar de forma segura el seguimiento del progreso asincrónico y reiniciar la consulta.
Si ha deshabilitado el seguimiento del progreso asincrónico sin completar este paso, es posible que aparezca el siguiente error:
java.lang.IllegalStateException: batch x doesn't exist
En los registros del controlador, es posible que vea el siguiente error:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Si sigue las instrucciones de esta sección para deshabilitar el seguimiento del progreso asincrónico, podrá solucionar estos errores y reparar la carga de trabajo de streaming.
Limitaciones con el seguimiento de progreso asincrónico
Esta característica tiene las siguientes limitaciones:
- El seguimiento de progreso asincrónico solo se admite en canalizaciones sin estado cuando se usa Kafka como receptor.
- No se garantiza exactamente un procesamiento de un extremo a otro con seguimiento de progreso asincrónico porque los intervalos de desplazamiento del lote se pueden cambiar en caso de error. Algunos sumideros, como Kafka, nunca ofrecen garantías de exactamente una vez.