Compartir vía


¿Qué es el seguimiento de progreso asincrónico?

El seguimiento de progreso asincrónico permite a las canalizaciones de Streaming estructurado controlar el progreso de forma asincrónica y paralela al procesamiento de datos real dentro de un microproceso, lo que reduce la latencia asociada al mantenimiento de la offsetLog y commitLog.

Seguimiento de progreso asincrónico

Nota

El seguimiento de progreso asincrónico no funciona con desencadenadores de Trigger.once o Trigger.availableNow. Si se intenta habilitar esta característica con estos desencadenadores, se produce un error de consulta.

¿Cómo funciona el seguimiento de progreso asincrónico para reducir la latencia?

Structured Streaming se basa en la conservación y administración de desplazamientos como indicadores de progreso para el procesamiento de consultas. La operación de administración de desplazamiento afecta directamente a la latencia de procesamiento, ya que no se puede producir ningún procesamiento de datos hasta que se completen estas operaciones. El seguimiento de progreso asincrónico permite que las canalizaciones de Structured Streaming realicen puntos de control del progreso sin verse afectadas por estas operaciones de gestión de offset.

¿Cuándo debe configurar la frecuencia del punto de control?

Los usuarios pueden configurar la frecuencia con la que se controla el progreso. La configuración predeterminada para la frecuencia del punto de control proporciona un buen rendimiento para la mayoría de las consultas. La configuración de la frecuencia es útil para escenarios en los que las operaciones de administración de desplazamiento se producen a una velocidad más alta de la que se pueden procesar, lo que crea un trabajo pendiente cada vez mayor de las operaciones de administración de desplazamiento. Para poner fin a este trabajo pendiente creciente, el procesamiento de datos se bloquea o ralentiza, lo que básicamente revierte el comportamiento de procesamiento para eliminar las ventajas del seguimiento de progreso asincrónico.

Nota

El tiempo de recuperación de errores aumenta con el aumento del intervalo de puntos de control. En caso de error, una canalización tiene que reprocesar todos los datos antes del punto de control exitoso anterior. Los usuarios pueden considerar este equilibrio entre una latencia menor durante el procesamiento normal y el tiempo de recuperación en caso de error.

¿Qué configuraciones están asociadas al seguimiento de progreso asincrónico?

Opción Valor Predeterminado Descripción
seguimientoDeProgresoAsincrónicoHabilitado verdadero/falso falso habilitar o deshabilitar el seguimiento de progreso asincrónico
asyncProgressTrackingCheckpointIntervalMs Milisegundos 1000 el intervalo en el que realizamos los desplazamientos y las confirmaciones de finalización

¿Cómo pueden los usuarios habilitar el seguimiento de progreso asincrónico?

Los usuarios pueden usar código similar al código siguiente para habilitar esta característica:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Desactivar el seguimiento de progreso asincrónico

Cuando el seguimiento de progreso asincrónico está habilitado, el marco no controla el progreso de cada lote. Para solucionar esto, antes de deshabilitar el seguimiento de progreso asincrónico, procese al menos dos microprocesos con la siguiente configuración:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Detenga la consulta después de que al menos dos microprocesos hayan terminado de procesarse. Ahora puede deshabilitar de forma segura el seguimiento de progreso asincrónico y reiniciar la consulta.

Si ha deshabilitado el seguimiento de progreso asincrónico sin completar este paso, puede producirse el siguiente error:

java.lang.IllegalStateException: batch x doesn't exist

En los registros del controlador, es posible que vea el siguiente error:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Siguiendo las instrucciones de esta sección para deshabilitar el seguimiento de progreso asincrónico le permite solucionar estos errores y reparar la carga de trabajo de streaming.

Limitaciones con el seguimiento de progreso asincrónico

Esta característica tiene las siguientes limitaciones:

  • El seguimiento de progreso asincrónico solo se admite en canalizaciones sin estado cuando se usa Kafka como receptor.
  • La ejecución 'exactamente una vez' para el procesamiento de extremo a extremo no está garantizada con el seguimiento del progreso asincrónico porque los rangos de desplazamiento por lotes se pueden cambiar en caso de fallo. Algunos receptores, como Kafka, nunca proporcionan garantías exactamente una vez.