Optimización del rendimiento para los clústeres de Apache Kafka HDInsight
En este artículo se proporcionan algunas sugerencias para optimizar el rendimiento de las cargas de trabajo de Apache Kafka en HDInsight. El enfoque se centra en el ajuste de la configuración del productor, agente y consumidor. A veces, también debe ajustar la configuración del sistema operativo para optimizar el rendimiento con cargas de trabajo pesadas. Hay diferentes maneras de medir el rendimiento, y las optimizaciones que aplique dependen de sus necesidades empresariales.
Introducción a la arquitectura
Los temas de Kafka se usan para organizar los registros. Los productores producen registros y los consumidores los consumen. Los productores envían registros a los agentes de Kafka, que luego almacenan los datos. Cada nodo de trabajo del clúster de HDInsight es un agente de Kafka.
Los temas particionan los registros entres los agentes. Al consumir registros, puede usar hasta un consumidor por partición para lograr el procesamiento en paralelo de los datos.
La replicación se usa para duplicar las particiones entre nodos. Esta partición protege contra interrupciones del nodo (agente). Una sola partición entre el grupo de réplicas se designa como el líder de particiones. El tráfico del productor se enruta al líder de cada nodo, con el estado administrado por ZooKeeper.
Identificación del escenario
El rendimiento general de Apache Kafka tiene dos aspectos principales: rendimiento y latencia. El rendimiento es la velocidad máxima a la que se pueden procesar los datos. Un mayor rendimiento es mejor. La latencia es el tiempo que se necesita para que los datos que se almacena o recuperen. Una menor latencia es mejor. Encontrar el equilibrio correcto entre el rendimiento, latencia y coste de la infraestructura de la aplicación puede ser algo desafiante. Los requisitos de rendimiento deberían coincidir con una de las siguientes tres situaciones comunes, en función de si requiere alto rendimiento, baja latencia o ambos:
- Alto rendimiento, baja latencia. Este escenario requiere un alto rendimiento y baja latencia (~ 100 milisegundos). Un ejemplo de este tipo de aplicación es la supervisión de la disponibilidad del servicio.
- Alto rendimiento, alta latencia. Este escenario requiere un alto rendimiento (~1,5 GBps), pero puede tolerar una latencia mayor (< 250 ms). Un ejemplo de este tipo de aplicación es la ingesta de datos de telemetría para procesos casi en tiempo real, como aplicaciones de seguridad y detección de intrusiones.
- Bajo rendimiento, baja latencia. Este escenario requiere una latencia baja (< 10 ms) para el procesamiento en tiempo real, pero puede tolerar un menor rendimiento. Un ejemplo de este tipo de aplicación es las comprobaciones de ortografía y gramática en línea.
Configuraciones de productor
En las secciones siguientes se resaltan algunas de las propiedades de configuración genéricas más importantes para optimizar el rendimiento de los productores de Kafka. Para obtener una explicación detallada de todas las propiedades de configuración, consulte la documentación de Apache Kafka sobre las configuraciones de productor.
Tamaño de lote
Los productores de Apache Kafka reúnen grupos de mensajes (denominados lotes) que se envían como una unidad que se almacenará en una partición de almacenamiento único. El tamaño de lote corresponde al número de bytes que deben estar presentes antes de que se transmita ese grupo. Aumentar el parámetro batch.size
puede aumentar el rendimiento, ya que reduce la sobrecarga de procesamiento de red y las solicitudes de E/S. Bajo una carga ligera, un mayor tamaño de lote puede aumentar la latencia de envío de Kafka mientras el productor espera a que un lote esté preparado. Bajo una carga elevada, se recomienda aumentar el tamaño de lote para mejorar el rendimiento y la latencia.
Confirmaciones requeridas por el productor
La configuración acks
requerida por el productor determina el número de confirmaciones que necesita el líder de particiones antes de que una solicitud de escritura se considere completada. Este valor afecta a la confiabilidad de los datos y toma los valores de 0
, 1
, o -1
. Un valor de -1
significa que se debe recibir una confirmación de todas las réplicas antes de que se complete la operación de escritura. El valor acks = -1
proporciona mayores garantías contra la pérdida de datos, pero también resulta en mayor latencia y menor rendimiento. Si los requisitos de la aplicación exigen un mayor rendimiento, pruebe a establecer acks = 0
o acks = 1
. Tenga en cuenta que no reconocer todas las réplicas puede reducir la confiabilidad de los datos.
Compresión
Un productor de Kafka puede configurarse para comprimir los mensajes antes de enviarlos a los agentes. El valor compression.type
especifica el códec de compresión que se usará. Los códecs de compresión admitidos son "gzip", "snappy" y "lz4". La compresión es beneficiosa y se debe tener en cuenta si hay una limitación en la capacidad de disco.
Entre los dos códecs de compresión que se usan con frecuencia, gzip
y snappy
, gzip
tiene una razón de compresión, lo que resulta en un menor uso de disco a costa de una carga más elevada de la CPU. El códec snappy
proporciona menos compresión con menos sobrecarga de la CPU. Puede decidir qué códec usar en función de las limitaciones de CPU del disco de agente o productor. gzip
puede comprimir los datos a una velocidad cinco veces mayor que snappy
.
La compresión de datos aumenta el número de registros que se pueden almacenar en un disco. También puede aumentar la sobrecarga de la CPU en los casos en los que hay una discrepancia entre los formatos de compresión que usan el productor y el agente, ya que los datos se deben comprimir antes del envío y luego descomprimir antes del procesamiento.
Configuración del agente
En las secciones siguientes se resaltan algunas de las opciones de configuración más importantes para optimizar el rendimiento de los agentes de Kafka. Para obtener una explicación detallada de todos los valores del agente, consulte la documentación de Apache Kafka sobre las configuraciones de agente.
Número de discos
Los discos de almacenamiento tienen un valor de IOPS (operaciones de entrada/salida por segundo) y de bytes de lectura/escritura por segundo limitado. Al crear nuevas particiones, Kafka almacena cada nueva partición en el disco con el menor número de particiones existentes para equilibrarlas entre todos los discos disponibles. A pesar de la estrategia de almacenamiento, al procesar cientos de réplicas de particiones en cada disco, Kafka puede saturar fácilmente el rendimiento de disco disponible. Aquí el término medio es entre el rendimiento y el coste. Si la aplicación requiere mayor rendimiento, cree un clúster con más discos administrados por agente. Actualmente, HDInsight no admite la adición de discos administrados a un clúster en ejecución. Para más información sobre cómo configurar el número de discos administrados, consulte Configuración del almacenamiento y la escalabilidad de Apache Kafka en HDInsight. Debe comprender las implicaciones de coste de aumentar el espacio de almacenamiento para los nodos del clúster.
Número de temas y particiones
Los productores de Kafka escriben en los temas. Los consumidores de Kafka leen de los temas. Un tema está asociado con un registro, que es una estructura de datos en el disco. Kafka anexa los registros de un productor al final de un registro de tema. Un registro de tema consta varias particiones distribuidas entre varios archivos. A su vez, estos archivos se distribuyen entre varios nodos de clúster de Kafka. Los consumidores leen los temas de Kafka a su ritmo y pueden elegir su posición (desplazamiento) en el registro de tema.
Cada partición de Kafka es un archivo de registro en el sistema y los subprocesos de productor pueden escribir en varios registros de manera simultánea. De forma similar, dado que cada subproceso de consumidor lee mensajes de una partición, el consumo desde varias particiones también se controla en paralelo.
Aumentar la densidad de la partición (el número de particiones por agente) agrega una sobrecarga relacionada con las operaciones de metadatos y solicitud/respuesta por partición entre el líder de particiones y sus seguidores. Incluso si no hay datos que fluyen, las réplicas de partición aún capturar datos de los líderes, lo que resulta en un procesamiento adicional para enviar y recibir solicitudes a través de la red.
Para los clústeres de Apache Kafka 2.1 y 2.4, y como se ha indicado anteriormente en HDInsight, recomendamos tener un máximo de 2 000 particiones por agente, incluidas las réplicas. Aumentar el número de particiones por agente reduce el rendimiento y también puede provocar la falta de disponibilidad de temas. Para más información sobre la compatibilidad con particiones de Kafka, consulte la entrada de blog oficial de Apache Kafka sobre el aumento del número de particiones compatibles en la versión 1.1.0. Para más información sobre cómo modificar temas, consulte Apache Kafka: modifying topics (Apache Kafka: modificar temas).
Número de réplicas
Un mayor factor de replicación resulta en solicitudes adicionales entre el líder de particiones y los seguidores. Como consecuencia, un factor de replicación más alto consume más recursos de disco y CPU para controlar las solicitudes adicionales, lo que aumenta la latencia de escritura y reduce el rendimiento.
Recomendamos usar un factor de replicación de al menos 3x para Kafka en Azure HDInsight. La mayoría de las regiones de Azure tienen tres dominios de error, pero en regiones con solo dos dominios de error, los usuarios deben usar un factor de replicación de 4x.
Para más información sobre la replicación, consulte Apache Kafka: replication (Apache Kafka: replicación) y Apache Kafka: increasing replication factor (Apache Kafka: aumentar el factor de replicación).
Configuraciones de consumidor
En la sección siguiente se resaltan algunas configuraciones genéricas importantes para optimizar el rendimiento de los consumidores de Kafka. Para obtener una explicación detallada de todas las configuraciones, consulte la documentación de Apache Kafka sobre las configuraciones de consumidor.
Número de consumidores
Es una buena práctica tener un número de particiones igual al número de consumidores. Si el número de consumidores es menor que el número de particiones, algunos de los consumidores leen de varias particiones, lo que aumenta la latencia del consumidor.
Si el número de consumidores es mayor que el número de particiones, se van a perder recursos de consumidor, ya que esos consumidores están inactivos.
Evitar el reequilibrio frecuente del consumidor
El reequilibrio del consumidor se desencadena por un cambio de propiedad de la partición (es decir, los consumidores se escalan horizontal o verticalmente), un bloqueo de agente (ya que los agentes son coordinadores de grupo para los grupos de consumidores), un bloqueo del consumidor o una incorporación de un nuevo tema o de nuevas particiones. Durante el reequilibrio, los consumidores no pueden consumir, lo que aumenta la latencia.
Los consumidores se consideran activos si pueden enviar un latido a un agente en session.timeout.ms
. De lo contrario, el consumidor se considera inactivo o con errores. Este retraso conduce a un reequilibrio del consumidor. Cuanto más bajo sea el consumidor de session.timeout.ms
, más rápido podremos detectar esos fallos.
Si session.timeout.ms
es demasiado bajo, un consumidor podría experimentar reequilibrios repetidos innecesarios, debido a escenarios como un lote de mensajes que tarda más tiempo en procesarse o una pausa de GC de JVM que tarda demasiado. Si tiene un consumidor que dedica demasiado tiempo al procesamiento de mensajes, puede solucionarlo si aumenta el límite superior de la cantidad de tiempo que un consumidor puede estar inactivo antes de capturar más registros con max.poll.interval.ms
o si reduce el tamaño máximo de lotes devueltos con el parámetro de configuración max.poll.records
.
Lotes
Al igual que los productores, se puede agregar procesamiento por lotes para los consumidores. La cantidad de datos que los consumidores pueden obtener en cada solicitud de captura se puede configurar cambiando la configuración fetch.min.bytes
. Este parámetro define los bytes mínimos esperados de una respuesta de captura de un consumidor. Aumentar este valor reduce el número de solicitudes de captura realizadas al agente, lo que reduce la sobrecarga adicional. De manera predeterminada, este valor es 1. Del mismo modo, hay otra configuración fetch.max.wait.ms
. Si una solicitud de captura no tiene suficientes mensajes según el tamaño de fetch.min.bytes
, espera hasta la expiración del tiempo de espera basándose en esta configuración fetch.max.wait.ms
.
Nota:
En algunos escenarios, los consumidores pueden parecer lentos cuando no se puede procesar el mensaje. Si no confirma el desplazamiento después de una excepción, el consumidor se atascará en un desplazamiento determinado en un bucle infinito y no avanzará, con el resultado de que aumentará el retraso en el lado del consumidor.
Ajuste del sistema operativo Linux con cargas de trabajo pesadas
Asignaciones de memoria
vm.max_map_count
define el número máximo de mmap que puede tener un proceso. De forma predeterminada, en la máquina virtual Linux del clúster de Apache Kafka de HDInsight, el valor es 65535.
En Apache Kafka, cada segmento de registro requiere un par de archivos index/timeindex, y cada uno de estos archivos consume un mmap. En otras palabras, cada segmento de registro usa dos mmap. Por lo tanto, si cada partición hospeda un único segmento de registro, requiere un mínimo de dos mmap. El número de segmentos de registro por partición varía en función del tamaño del segmento, intensidad de la carga, directiva de retención, período sucesivo y, por lo general, tiende a ser más de uno. Mmap value = 2*((partition size)/(segment size))*(partitions)
Si el valor de mmap necesario supera vm.max_map_count
, el agente generaría una excepción "Map failed" (Error de asignación).
Para evitar esta excepción, utilice los siguientes comandos para comprobar el tamaño de mmap en la máquina virtual y aumentar el tamaño si es necesario en cada nodo de trabajo.
# command to find number of index files:
find . -name '*index' | wc -l
# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l
# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>
# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p
Nota
Tenga cuidado al establecer este valor demasiado alto, ya que ocupa memoria en la máquina virtual. La cantidad de memoria que JVM puede usar en asignaciones de memoria viene determinada por el valor MaxDirectMemory
. El valor predeterminado es de 64 MB. Es posible que se alcance este valor. Puede aumentar este valor agregando -XX:MaxDirectMemorySize=amount of memory used
a la configuración de JVM mediante Ambari. Tenga en cuenta la cantidad de memoria que se utiliza en el nodo y si hay suficiente RAM disponible para admitirla.