Tutorial: Use Apache Kafka streams API in Azure HDInsight

Learn how to create an application that uses the Apache Kafka Streams API and run it with Kafka on HDInsight.

The application used in this tutorial is a streaming word count. It reads text data from a Kafka topic, extracts individual words, and then stores the word and count into another Kafka topic.

Kafka stream processing is often done using Apache Spark. Kafka version 2.1.1 and 2.4.1 (in HDInsight 4.0 and 5.0) supports the Kafka Streams API. This API allows you to transform data streams between input and output topics.

For more information on Kafka Streams, see the Intro to Streams documentation on Apache.org.

In this tutorial, you learn how to:

  • Understand the code
  • Build and deploy the application
  • Configure Kafka topics
  • Run the code

Prerequisites

Understand the code

The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Streaming subdirectory. The application consists of two files:

  • pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Stream.java: This file implements the streaming logic.

Pom.xml

The important things to understand in the pom.xml file are:

  • Dependencies: This project relies on the Kafka Streams API, which is provided by the kafka-clients package. The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Plugins: Maven plugins provide various capabilities. In this project, the following plugins are used:

    • maven-compiler-plugin: Used to set the Java version used by the project to 8. HDInsight 4.0 and 5.0 requires Java 8.
    • maven-shade-plugin: Used to generate an uber jar that contains this application, and any dependencies. It's also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Stream.java

The Stream.java file uses the Streams API to implement a word count application. It reads data from a Kafka topic named test and writes the word counts into a topic named wordcounts.

The following code defines the word count application:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Build and deploy the example

To build and deploy the project to your Kafka on HDInsight cluster, use the following steps:

  1. Set your current directory to the location of the hdinsight-kafka-java-get-started-master\Streaming directory, and then use the following command to create a jar package:

    mvn clean package
    

    This command creates the package at target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Replace sshuser with the SSH user for your cluster, and replace clustername with the name of your cluster. Use the following command to copy the kafka-streaming-1.0-SNAPSHOT.jar file to your HDInsight cluster. If prompted, enter the password for the SSH user account.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Create Apache Kafka topics

  1. Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Install jq, a command-line JSON processor. From the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  3. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command:

    export PASSWORD='PASSWORD'
    
  4. Extract correctly cased cluster name. The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. This command obtains the actual casing, and then stores it in a variable. Enter the following command:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Note

    If you're doing this process from outside the cluster, there is a different procedure for storing the cluster name. Get the cluster name in lower case from the Azure portal. Then, substitute the cluster name for <clustername> in the following command and execute it: export clusterName='<clustername>'.

  5. To get the Kafka broker hosts and the Apache Zookeeper hosts, use the following commands. When prompted, enter the password for the cluster login (admin) account.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Note

    These commands require Ambari access. If your cluster is behind an NSG, run these commands from a machine that can access Ambari.

  6. To create the topics used by the streaming operation, use the following commands:

    Note

    You may receive an error that the test topic already exists. This is OK, as it may have been created in the Producer and Consumer API tutorial.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    The topics are used for the following purposes:

    • test: This topic is where records are received. The streaming application reads from here.
    • wordcounts: This topic is where the streaming application stores its output.
    • RekeyedIntermediateTopic: This topic is used to repartition data as the count is updated by the countByKey operator.
    • wordcount-example-Counts-changelog: This topic is a state store used by the countByKey operation

    Kafka on HDInsight can also be configured to automatically create topics. For more information, see the Configure automatic topic creation document.

Run the code

  1. To start the streaming application as a background process, use the following command:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    You may get a warning about Apache log4j. You can ignore this warning.

  2. To send records to the test topic, use the following command to start the producer application:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Once the producer completes, use the following command to view the information stored in the wordcounts topic:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    The --property parameters tell the console consumer to print the key (word) along with the count (value). This parameter also configures the deserializer to use when reading these values from Kafka.

    The output is similar to the following text:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    The parameter --from-beginning configures the consumer to start at the beginning of the records stored in the topic. The count increments each time a word is encountered, so the topic contains multiple entries for each word, with an increasing count.

  4. Use the Ctrl + C to exit the producer. Continue using Ctrl + C to exit the application and the consumer.

  5. To delete the topics used by the streaming operation, use the following commands:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Clean up resources

To clean up the resources created by this tutorial, you can delete the resource group. Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

To remove the resource group using the Azure portal:

  1. In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Select Delete resource group, and then confirm.

Next steps

In this document, you learned how to use the Apache Kafka Streams API with Kafka on HDInsight. Use the following to learn more about working with Kafka.