Ujście komunikatów platformy Apache Kafka® do usługi Azure Cosmos DB dla systemu Apache Cassandra przy użyciu narzędzia Apache Flink® w usłudze HDInsight w usłudze AKS


Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.


Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

W tym przykładzie użyto narzędzia Apache Flink do ujścia komunikatów usługi HDInsight dla platformy Apache Kafka w usłudze Azure Cosmos DB for Apache Cassandra.

Ten przykład jest widoczny, gdy inżynierowie preferują dane zagregowane w czasie rzeczywistym na potrzeby analizy. Dzięki dostępowi do historycznych zagregowanych danych można tworzyć modele uczenia maszynowego w celu tworzenia szczegółowych informacji lub akcji. Możesz również pozyskiwać dane IoT do platformy Apache Flink, aby agregować dane w czasie rzeczywistym i przechowywać je w usłudze Apache Cassandra.

Wymagania wstępne

Usługa Azure Cosmos DB dla bazy danych Apache Cassandra

Usługa Azure Cosmos DB dla systemu Apache Cassandra może służyć jako magazyn danych dla aplikacji napisanych dla systemu Apache Cassandra. Ta zgodność oznacza, że przy użyciu istniejących sterowników Apache zgodnych z CQLv4 istniejąca aplikacja Cassandra może teraz komunikować się z interfejsem API dla bazy danych Cassandra.

Aby uzyskać więcej informacji, zobacz następujące linki.

Zrzut ekranu przedstawiający sposób tworzenia usługi Azure Cosmos DB dla bazy danych Apache Cassandra w witrynie Azure Portal.

Uzyskiwanie poświadczeń używa go w kodzie źródłowym usługi Stream:

Zrzut ekranu przedstawiający sposób pobierania poświadczeń w kodzie źródłowym strumienia.


Na maszynie wirtualnej z systemem Ubuntu przygotujmy środowisko programistyczne

Klonowanie repozytorium przykładów platformy Azure

Zapoznaj się z plikiem readme usługi GitHub, aby pobrać narzędzie maven i sklonować to repozytorium przy użyciu przykładów Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git platformy Azure.

Aktualizowanie projektu maven dla bazy danych Cassandra

Przejdź do folderu projektu maven azure-cosmos-db-cassandra-java-getting-started-main i zaktualizuj zmiany wymagane w tym przykładzie.

Konfiguracja połączenia usługi Cosmos DB dla bazy danych Apache Cassandra

Musisz zaktualizować nazwę hosta i nazwę użytkownika oraz klucze w poniższym fragmencie kodu.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_port = 10350
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>

struktura źródłowa

Folder narzędzia


Zmiana ssl_keystore_file_path zależy od lokalizacji certyfikatu java. Klaster Apache Flink w usłudze HDInsight w usłudze AKS, ścieżka to /usr/lib/jvm/msopenjdk-11-jre/lib/security

package com.azure.cosmosdb.cassandra.util;

import com.datastax.driver.core.*;

import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;

 * Cassandra utility class to handle the Cassandra Sessions
public class CassandraUtils {

    private Cluster cluster;
    private Configurations config = new Configurations();
    private String cassandraHost = "<cassandra-host-ip>";
    private int cassandraPort = 10350;
    private String cassandraUsername = "localhost";
    private String cassandraPassword = "<cassandra-password>";
    private File sslKeyStoreFile = null;
    private String sslKeyStorePassword = "<keystore-password>";

     * This method creates a Cassandra Session based on the end-point details given in config.properties.
     * This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
     * If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
     * @return Session Cassandra Session
    public Session getSession() {

        try {
            //Load cassandra endpoint details from config.properties

            final KeyStore keyStore = KeyStore.getInstance("JKS");
            try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
                keyStore.load(is, sslKeyStorePassword.toCharArray());

            final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
            kmf.init(keyStore, sslKeyStorePassword.toCharArray());
            final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory

            // Creates a socket factory for HttpsURLConnection using JKS contents.
            final SSLContext sc = SSLContext.getInstance("TLSv1.2");
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());

            JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
            cluster = Cluster.builder()
                    .withCredentials(cassandraUsername, cassandraPassword)

            return cluster.connect();
        } catch (Exception ex) {
        return null;

    public Cluster getCluster() {
        return cluster;

     * Closes the cluster and Cassandra session
    public void close() {

     * Loads Cassandra end-point details from config.properties.
     * @throws Exception
    private void loadCassandraConnectionDetails() throws Exception {
        cassandraHost = config.getProperty("cassandra_host");
        cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
        cassandraUsername = config.getProperty("cassandra_username");
        cassandraPassword = config.getProperty("cassandra_password");
        String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
        String ssl_keystore_password = config.getProperty("ssl_keystore_password");

        // If ssl_keystore_file_path, build the path using JAVA_HOME directory.
        if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
            String javaHomeDirectory = System.getenv("JAVA_HOME");
            if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
                throw new Exception("JAVA_HOME not set");
            ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();

        sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
                ssl_keystore_password : sslKeyStorePassword;

        sslKeyStoreFile = new File(ssl_keystore_file_path);

        if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
            throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));


package com.azure.cosmosdb.cassandra.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

 * Configuration utility to read the configurations from properties file
public class Configurations {
    private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
    private static String PROPERTY_FILE = "config.properties";
    private static Properties prop = null;

    private void loadProperties() throws IOException {
        InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
        if (input == null) {
            LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
        prop = new Properties();

    public String getProperty(String propertyName) throws IOException {
        if (prop == null) {
        return prop.getProperty(propertyName);


Folder Przykłady


package com.azure.cosmosdb.cassandra.examples;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;

public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {

    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {

            CassandraUtils utils = new CassandraUtils();
            Session cassandraSession = utils.getSession();
            try {
                UserRepository repository = new UserRepository(cassandraSession);

                //Insert rows into user table
                PreparedStatement preparedStatement = repository.prepareInsertStatement();
                repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);

                } finally {
                        if (null != utils) utils.close();
                        if (null != cassandraSession) cassandraSession.close();

klasa główna: CassandraDemo.java


  • Zamień adresy IP brokera platformy Kafka na adresy IP brokera klastra platformy Kafka
  • Przygotowywanie tematu
    • użytkownik /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
package com.azure.cosmosdb.cassandra.examples;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CassandraDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        // 1. read kafka message as stream input, update the broker IPs from your Kafka setup
        String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setValueOnlyDeserializer(new SimpleStringSchema())

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
            String[] fields = line.split(",");
            int v1 = Integer.parseInt(fields[0]);
            Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
            return tuple3;

        dataStream.addSink(new CassandraSink());

        // 4. run stream
        env.execute("sink Kafka to Cosmos DB for Apache Cassandra");

Kompilowanie projektu

Uruchom polecenie mvn clean install from azure-cosmos-db-cassandra-java-getting-started-main folder , aby skompilować projekt. To polecenie generuje cosmosdb-cassandra-examples.jar w folderze docelowym.

Przekazywanie pliku jar do usługi Azure Storage i wget do protokołu webssh

Przygotowywanie magazynu kluczy i tabeli usługi Cosmos DB

Uruchom klasę UserProfile w folderze /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples, aby utworzyć magazyn kluczy i tabelę usługi Azure Cosmos DB.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar

Tematy ujścia platformy Kafka do usługi Cosmos DB dla systemu Apache Cassandra

Uruchom klasę CassandraDemo, aby ujść temat platformy Kafka w usłudze Cosmos DB dla systemu Apache Cassandra.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar

Zrzut ekranu przedstawiający sposób uruchamiania pokazu bazy danych Cassandra.

Sprawdź zadanie w interfejsie użytkownika sieci Web Flink w usłudze HDInsight w klastrze usługi AKS.

Zrzut ekranu przedstawiający sposób sprawdzania zadania w usłudze HDInsight w interfejsie użytkownika Flink usługi AKS.

Tworzenie komunikatów na platformie Kafka

Tworzenie komunikatu w temacie platformy Kafka.

sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random

user_set = [

city_set = [

def main():
        while True:
                unique_id = str(int(time.time()))
                if random.randrange(10) < 4:
                        city = random.choice(city_set[:3])
                        city = random.choice(city_set)
                user = random.choice(user_set)
                print(unique_id + "," + user + "," + city )

if __name__ == "__main__":
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516

Sprawdzanie tabeli w usłudze Cosmos DB dla bazy danych Apache Cassandra w witrynie Azure Portal

Zrzut ekranu przedstawiający usługę Cosmos DB dla bazy danych Apache Cassandra w witrynie Azure Portal.
