Руководство. Использование API потоков Apache Kafka в Azure HDInsight
Узнайте, как создать приложение, использующее API для Apache Kafka Streams, и запустить его с помощью Kafka в HDInsight.
В этом руководстве используется приложение для подсчета слов во время потоковой передачи. Оно считывает текстовые данные из раздела Kafka, извлекает отдельные слова, а затем сохраняет слово и количество слов в другом разделе Kafka.
Обработка потока Kafka часто выполняется с помощью Apache Spark. Kafka версии 2.1.1 и 2.4.1 (в HDInsight 4.0 и 5.0) поддерживает API Kafka Потоки. Этот API позволяет преобразовать потоки данных между входными и выходными разделами.
Дополнительные сведения о Потоках Kafka см. в вводной документации на сайте Apache.org.
В этом руководстве описано следующее:
- Изучение кода
- Создание и развертывание приложения.
- Настройка разделов Kafka.
- Выполнение кода
Необходимые компоненты
Кластер Kafka в HDInsight 4.0 или 5.0. Чтобы узнать, как создать кластер Kafka в HDInsight, ознакомьтесь с документом начале работы с Apache Kafka в HDInsight.
Выполните шаги, приведенные в статье Руководство. Использование API производителя и потребителя Apache Kafka. В шагах, описанных в этом документе, используется пример приложения и разделы, созданные в этом руководстве.
Пакет Java Developer Kit (JDK) версии 8 или аналогичный пакет, например OpenJDK.
Средство Apache Maven, установленное согласно инструкций Apache. Maven — система сборки проектов Java.
Клиент SSH. Дополнительные сведения см. в руководстве по подключению к HDInsight (Apache Hadoop) с помощью SSH.
Изучение кода
Пример приложения расположен в подкаталоге Streaming
по адресу https://github.com/Azure-Samples/hdinsight-kafka-java-get-started. Приложение состоит из двух файлов:
- файл
pom.xml
определяет зависимости проекта, версию Java и методы упаковки; - файл
Stream.java
реализует логику потоковой передачи.
Pom.xml
В файле pom.xml
важны следующие элементы:
Зависимости. Этот проект использует API Потоков Kafka, предоставленный в пакете
kafka-clients
. Приведенный ниже код XML определяет эту зависимость:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Запись
${kafka.version}
объявлена в разделе<properties>..</properties>
файлаpom.xml
. Она настроена для версии Kafka кластера HDInsight.Подключаемые модули. Подключаемые модули Maven предоставляют различные возможности. В этом проекте используются следующие подключаемые модули:
- С помощью модуля
maven-compiler-plugin
можно задать для проекта Java версии 8. Для HDInsight 4.0 и 5.0 требуется Java 8. maven-shade-plugin
: используется для создания файла типа uber jar, содержащего это приложение, а также любые зависимости. Также используется для установки точки входа приложения, с помощью которой вы сможете напрямую запускать JAR-файл, не указывая основной класс.
- С помощью модуля
Stream.java
Файл Stream.java использует API потоков для реализации приложения для подсчета слов. Оно считывает данные из раздела Kafka с именем test
и записывает количество слов в раздел с именем wordcounts
.
Следующий код определяет приложение для подсчета слов:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Создание и развертывание примера
Чтобы создать и развернуть проект для Kafka в кластере HDInsight, выполните следующие действия:
Укажите для текущего каталога расположение каталога
hdinsight-kafka-java-get-started-master\Streaming
и выполните следующую команду, чтобы создать пакет JAR:mvn clean package
Эта команда создает пакет в файле
target/kafka-streaming-1.0-SNAPSHOT.jar
.Замените
sshuser
именем пользователя SSH для кластера, аclustername
— именем кластера. Используя следующую команду, скопируйте файлkafka-streaming-1.0-SNAPSHOT.jar
в свой кластер HDInsight. При появлении запроса введите пароль для учетной записи пользователя SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Создание разделов Apache Kafka
Замените
sshuser
именем пользователя SSH для кластера, аCLUSTERNAME
— именем кластера. Откройте SSH-подключение к кластеру, выполнив следующую команду. При появлении запроса введите пароль для учетной записи пользователя SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Установите jq — обработчик командной строки JSON. В открытом сеансе SSH-подключения введите следующую команду для установки
jq
:sudo apt -y install jq
Настройте переменную пароля. Замените
PASSWORD
паролем для входа в кластер, а затем введите следующую команду:export PASSWORD='PASSWORD'
Извлеките имя кластера с правильным регистром. Фактический регистр имени кластера может отличаться от ожидаемого, в зависимости от способа создания кластера. Эта команда получает фактический регистр, а затем сохраняет его в переменной. Введите следующую команду:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Примечание.
Если вы выполняете этот процесс вне кластера, используйте другой способ хранения имени кластера. Получите имя кластера в нижнем регистре на портале Azure. Затем измените имя кластера на
<clustername>
в следующей команде и выполните ее:export clusterName='<clustername>'
.Чтобы получить узлы Apache Zookeeper и брокера Kafka, используйте приведенные ниже команды. При появлении запроса введите пароль для учетной записи администратора, чтобы войти на кластер.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Примечание.
Для этих команд требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните следующие команды на компьютере с доступом к Ambari.
Чтобы создать разделы для операции потоковой передачи, используйте следующие команды:
Примечание.
Вы можете получить сообщение-ошибку о том, что раздел
test
уже существует. Это нормально, так как он, возможно, был создан в руководстве по API производителя и потребителя./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Разделы используются для следующих целей:
test
. В этот раздел поступают записи. Здесь приложение потоковой передачи считывает их.wordcounts
. В этом разделе приложение потоковой передачи хранит свои выходные данные.RekeyedIntermediateTopic
. Этот раздел используется для секционирования данных, так как счетчик обновляется операторомcountByKey
.wordcount-example-Counts-changelog
. Этот раздел является хранилищем состояний, используемым операциейcountByKey
.
Кроме того, Kafka в HDInsight можно настроить на автоматическое создание разделов. Дополнительные сведения см. в статье How to configure Apache Kafka on HDInsight to automatically create topics (Настройка автоматического создания разделов в Apache Kafka в HDInsight).
Выполнение кода
Для запуска приложения потоковой передачи в качестве фонового процесса используйте следующую команду:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Вы можете получить предупреждение об Apache
log4j
. Его можно проигнорировать.Чтобы отправить записи в раздел
test
, используйте следующую команду для запуска приложения-отправителя:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
После завершения работы отправителя просмотрите сведения, хранящиеся в разделе
wordcounts
, с помощью следующей команды:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
В соответствии с параметрами
--property
объект-получатель консоли печатает ключ (машинное слово) и число (значение). Кроме того, этот параметр настраивает десериализатор, используемый при считывании этих значений из Kafka.Результат будет аналогичен приведенному ниже:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Параметр
--from-beginning
настраивает запуск объекта-получателя в начале записей, хранящихся в разделе. Число увеличивается каждый раз, когда встречается слово, поэтому раздел содержит несколько записей для каждого слова с увеличивающимся числом.Нажмите клавиши Ctrl+C, чтобы закрыть отправитель. Снова нажмите клавиши Ctrl+C, чтобы выйти из приложения и объекта-получателя.
Чтобы удалить разделы для операции потоковой передачи, используйте следующие команды:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Очистка ресурсов
Чтобы очистить ресурсы, созданные при работе с этим руководством, удалите группу ресурсов. При этом будет удален связанный кластер HDInsight и другие ресурсы, связанные с этой группой ресурсов.
Чтобы удалить группу ресурсов с помощью портала Azure, сделайте следующее:
- На портале Azure разверните меню слева, чтобы открыть меню служб, а затем выберите Группы ресурсов, чтобы просмотреть список групп ресурсов.
- Найдите группу ресурсов, которую нужно удалить, и щелкните правой кнопкой мыши кнопку Дополнительно (…) справа от списка.
- Выберите Удалить группу ресурсов и подтвердите выбор.
Следующие шаги
Из этого документа вы узнали, как использовать API для Apache Kafka Streams с Kafka в HDInsight. Дополнительные сведения о работе с Kafka см. в следующих материалах.