Поделиться через


Оптимизация производительности для кластеров Apache Kafka HDInsight

В этой статье приведены рекомендации по оптимизации производительности рабочих нагрузок Apache Kafka в HDInsight. Основное внимание уделяется настройке параметров производителя, брокера и потребителя. Иногда также необходимо настроить параметры операционной системы, чтобы настроить производительность с высокой нагрузкой. Существуют различные способы измерения производительности, а применяемые оптимизации зависят от потребностей бизнеса.

Обзор архитектуры

Разделы Kafka используются для организации записей. Производители производят записи, и потребители потребляют их. Производители отправляют записи в брокеры Kafka, которые затем сохраняют эти данные. Каждый рабочий узел в кластере HDInsight — это брокер Kafka.

Разделы позволяют распределить записи между брокерами. При считывании записей можно использовать один потребитель на секцию, чтобы обеспечить параллельную обработку данных.

Для дублирования секций между узлами используется репликация. Этот раздел защищает от сбоев узлов (брокера). Одна секция в группе реплик назначается главной. Трафик производителя направляется в ведущую секцию каждого узла в зависимости от состояния, которым управляет ZooKeeper.

Определение своего сценария

Производительность Apache Kafka характеризуется двумя основными параметрами: пропускной способностью и задержкой. Пропускная способность — максимальная скорость обработки данных. Более высокая пропускная способность лучше. Задержка — это время, затрачиваемое на сохранение или получение данных. Более низкая задержка лучше. Определение оптимального баланса между пропускной способностью, задержкой и стоимостью инфраструктуры приложения может быть непростой задачей. Требования к производительности должны соответствовать одной из следующих трех распространенных ситуаций в зависимости от того, требуется ли высокая пропускная способность, низкая задержка или оба:

  • Высокая пропускная способность и низкая задержка. Для этого сценария требуются высокая пропускная способность и низкая задержка (~100 мс). Примером такого сценария является наблюдение за доступностью служб.
  • Высокая пропускная способность и высокая задержка. Этот сценарий требует высокой пропускной способности (~1,5 Гбит/с), но допускает большую задержку (до 250 мс). Примером такого сценария является прием данных телеметрии для процессов, выполняемых почти в реальном времени, таких как обеспечение безопасности и обнаружение вторжений.
  • Низкая пропускная способность и низкая задержка. В этом сценарии требуется низкая задержка (< 10 мс) для обработки данных в режиме реального времени, но допускается более низкая пропускная способность. Примером такого сценария является проверка орфографии и грамматики по сети.

Конфигурации производителя

В следующих разделах описаны некоторые из наиболее важных универсальных свойств конфигурации для оптимизации производительности производителей Kafka. Подробное описание всех свойств конфигурации см. в документации Apache Kafka по настройке производителей.

Размер пакета

Производители Apache Kafka собирают группы сообщений (называемые пакетами), которые отправляются как единое целое для хранения в одной секции хранилища. Размер пакета — это число байтов, которые должны в нем содержаться перед передачей соответствующей группы. Увеличение параметра batch.size может увеличить пропускную способность, поскольку снижает потребность в дополнительной обработке запросов из сети и запросов ввода-вывода. При низкой нагрузке увеличение размера пакета может вести к росту задержки при отправке Kafka, так как производитель ожидает, пока пакет будет готов. При высокой нагрузке рекомендуется увеличивать размер пакета, чтобы повысить пропускную способность и задержку.

Требуемые производителем подтверждения

Требуемый производителем параметр acks определяет число подтверждений, необходимых для заполнения секции, прежде чем запрос на запись будет считаться завершенным. Этот параметр влияет на надежность данных и принимает значение 0, 1 или -1. Значение -1 означает, что для завершения записи должно быть получено подтверждение от всех реплик. Параметр acks = -1 обеспечивает более надежную защиту от потери данных, но также приводит к увеличению задержки и снижению пропускной способности. Если в вашем сценарии требуется более высокая пропускная способность, попробуйте установить acks = 0 или acks = 1. Помните, что подтверждение не всех реплик может снизить надежность данных.

Сжатие

Производитель Kafka можно настроить для сжатия сообщений перед их отправкой брокерам. Параметр compression.type определяет используемый кодек сжатия. Поддерживаются кодеки сжатия gzip, snappy и lz4. Сжатие является полезной функцией, которую можно применить при наличии ограничений на емкость диска.

Из двух часто используемых кодеков сжатия gzip и snappy более высоким коэффициентом сжатия отличается gzip, и это снижает требования к наличию места на диске за счет увеличения нагрузки на ЦП. Кодек snappyобеспечивает меньшее сжатие с меньшей дополнительной нагрузкой на ЦП. Вы можете выбрать кодек на основе ограничений на дисковую подсистему брокера или процессор на производителе. gzip может сжимать данные в пять раз быстрее, чем snappy.

Сжатие данных увеличивает количество записей, которые можно хранить на диске. Оно также может увеличить нагрузку на ЦП при несоответствии форматов сжатия, используемых производителем и брокером, так как данные сжимаются перед отправкой, а затем распаковываются перед обработкой.

Параметры брокера

В следующих разделах описаны некоторые из наиболее важных параметров для оптимизации производительности брокеров Kafka. Подробное описание всех параметров брокера см. в документации Apache Kafka по настройке брокеров.

Число дисков

Для дисков хранилища ограничено число операций ввода-вывода в секунду и байтов чтения/записи в секунду. При создании секций Kafka сохраняет каждую новую секцию на диске с минимальным количеством существующих, чтобы сбалансированно распределить их по доступным дискам. Несмотря на любую стратегию хранения, при обработке сотен реплик секций на каждом диске Kafka может легко исчерпать доступную пропускную способность диска. Компромисс заключается в выборе между пропускной способностью и ценой. Если для сценария требуется более высокая пропускная способность, создайте кластер с большим числом управляемых дисков на брокер. В настоящее время HDInsight не поддерживает добавление управляемых дисков в работающий кластер. Дополнительные сведения о настройке количества управляемых дисков см. в статье Настройка объема хранилища и уровня масштабируемости для Apache Kafka в HDInsightt. Проанализируйте рост затрат при увеличении дискового пространства узлов в кластере.

Количество разделов и секций.

Производители Kafka записывают данные в разделы. Потребители Kafka считывают данные из разделов. Раздел связан с журналом, который представляет собой структуру данных на диске. Kafka добавляет полученные от производителей записи в конец журнала раздела. Журнал разделов состоит из множества секций, распределенных по нескольким файлам. Эти файлы, в свою очередь, распределяются по нескольким узлам кластера Kafka. Потребители считывают данные из разделов Kafka в течение выделенных им периодов и могут выбирать позиции (смещение) в журнале раздела.

Каждая секция Kafka является файлом журнала в системе, а потоки-производители могут записывать данные в несколько журналов одновременно. Аналогично, поскольку каждый поток-потребитель считывает сообщения из одной секции, их считывание из нескольких секций также осуществляется в параллельном режиме.

Увеличение плотности секций (число секций на брокер) повышает дополнительную нагрузку, связанную с операциями с метаданными и посекционными запросами/ответами между главной секцией и его подписчиками. Даже в отсутствие проходящего через них потока данных реплики секций все равно получают данные от главных секций, что приводит к дополнительной обработке запросов отправки и получения в сети.

Для кластеров Apache Kafka 2.1 и 2.4 и, как отмечалось ранее в HDInsight, рекомендуется иметь не более 2000 секций на брокер, включая реплика. Увеличение числа секций на брокер снижает пропускную способность и может становиться причиной недоступности раздела. Дополнительные сведения о поддержке секций Kafka Apache см. в официальной записи блога Apache Kafka о увеличении поддерживаемого числа секций в версии 1.1.0. Дополнительные сведения об изменении разделов см. в статье об изменении разделов Apache Kafka.

Количество реплик

Более высокий коэффициент репликации ведет к увеличению числа запросов между главной секцией и подписчиками. Вследствие этого более высокий коэффициент репликации ведет к увеличению потребляемого дискового пространства и ресурсов ЦП для обработки дополнительных запросов, что увеличивает задержку записи и уменьшает пропускную способность.

Мы рекомендуем использовать для Kafka в Azure HDInsight по крайней мере 3-кратную репликацию. В большинстве регионов Azure есть три домена сбоя, но в регионах с двумя доменами сбоя пользователи должны использовать 4-кратную репликацию.

Дополнительные сведения о репликации см. в статьях о репликации и увеличении коэффициента репликации Apache Kafka.

Конфигурации потребителя

В следующем разделе описаны некоторые важные универсальные конфигурации для оптимизации производительности потребителей Kafka. Подробное описание всех свойств конфигурации см. в документации Apache Kafka по настройке потребителей.

Количество потребителей

Рекомендуется, чтобы количество секций равнялось количеству потребителей. Если число потребителей меньше количества секций, то несколько потребителей считываются из нескольких разделов, увеличивая задержку потребителя.

Если число потребителей больше числа секций, то вы используете ресурсы потребителей, так как эти потребители неактивны.

Избегайте частого перераспределения потребителей

Перераспределение потребителей инициируется изменением владения секцией (т. е. потребители масштабируются в большую ил меньшую сторону), сбоем брокера (так как брокеры являются координаторами групп для групп потребителей), сбоем потребителя, добавлением нового раздела или добавлением новых секций. Во время повторного распределения пользователи не могут получать услуги, что увеличивает задержку.

Потребители считаются живыми, если могут отправить пакет пульса брокер в течение session.timeout.ms. В противном случае потребитель считается мертвым или не удалось. Эта задержка приводит к перебалансу потребителей. Чем ниже потребитель session.timeout.ms, тем быстрее мы можем обнаружить эти сбои.

Если значение session.timeout.ms слишком низко, потребитель может столкнуться с повторными ненужными перераспределениями из-за сценариев, например, когда обработка пакета сообщений занимает больше времени или когда приостановка сборки мусора виртуальной машины Java занимает слишком много времени. Если у вас есть потребитель, который тратит слишком много времени на обработку сообщений, эту проблему можно решить, увеличив верхнюю границу времени, в течение которого потребитель может бездействовать, прежде чем получить дополнительные записи с помощью max.poll.interval.ms, или путем уменьшения максимального размера пакетов, возвращаемых с параметром конфигурации max.poll.records.

Пакетная обработка

Как и производители, мы можем добавлять для потребителей пакетную обработку. Количество потребителей данных, которые могут быть получены в каждом запросе на получение, можно настроить, изменив конфигурацию fetch.min.bytes. Этот параметр определяет минимальное количество байтов, ожидаемое от ответа потребителя. Увеличение этого значения снижает количество запросов на получение, сделанных брокеру, поэтому снижает дополнительные затраты. По умолчанию используется значение 1. Аналогичным образом существует другая конфигурация fetch.max.wait.ms. Если запрос на получение не имеет достаточно сообщений в соответствии с размером fetch.min.bytes, он ожидает истечения срока ожидания на основе этой конфигурации fetch.max.wait.ms.

Примечание.

В некоторых случаях потребители могут показаться слишком медленными, когда не удается обработать сообщение. Если смещение после исключения не фиксируется, потребитель будет задерживаться в определенном смещении в бесконечном цикле и не будет двигаться вперед, что приведет к увеличению запаздывания на стороне потребителя.

Настройка ОС Linux с высокой рабочей нагрузкой

Карты памяти

vm.max_map_count определяет максимальное количество карт памяти, которое может иметь процесс. По умолчанию на виртуальной машине кластера HDInsight Apache Kafka под управлением Linux значение равно 65 535.

В Apache Kafka каждый сегмент журнала требует пары файлов index/timeindex, и каждый из этих файлов использует одну mmap. Другими словами, каждый сегмент журнала использует два mmap. Таким образом, если каждая секция размещает один сегмент журнала, требуется не менее двух mmap. Количество сегментов журнала на секцию зависит от размера сегмента, интенсивности нагрузки, политики хранения, периодической точки и, как правило, превышает одну единицу. Mmap value = 2*((partition size)/(segment size))*(partitions)

Если требуемое значение карт памяти превышает vm.max_map_count, брокер вызовет исключение "Сбой карты".

Чтобы избежать этого исключения, используйте приведенные ниже команды, чтобы проверить размер карт памяти в виртуальной машине и при необходимости увеличить его для каждого рабочего узла.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Примечание.

Будьте внимательны при установке слишком высокого значения для этого параметра, так как он занимает память на виртуальной машине. Объем памяти, который может использоваться объектом виртуальной машины Java в картах памяти, определяется параметром MaxDirectMemory. Значение по умолчанию — 64 МБ. Достичь этого значения возможно. Это значение можно увеличить, добавив -XX:MaxDirectMemorySize=amount of memory used в параметры виртуальной машины Java через Ambari. Следите за объемом памяти, используемом на узле, и наличием достаточного объема оперативной памяти для поддержки этой возможности.

Следующие шаги