Freigeben über


Speichern von Apache Kafka®-Nachrichten in Azure Cosmos DB for Apache Cassandra mit Apache Flink® in HDInsight on AKS

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Dieses Beispiel verwendet Apache Flink, um Nachrichten aus HDInsight für Apache Kafka in Azure Cosmos DB for Apache Cassandra zu speichern.

Dieses Beispiel ist von Bedeutung, wenn Techniker*innen aggregierte Echtzeitdaten für die Analyse bevorzugen. Mit Zugriff auf historische aggregierte Daten können Sie ML-Modelle (Machine Learning) erstellen, um Erkenntnisse oder Aktionen zu generieren. Sie können IoT-Daten auch in Apache Flink erfassen, um Daten in Echtzeit zu aggregieren und in Apache Cassandra zu speichern.

Voraussetzungen

Azure Cosmos DB for Apache Cassandra

Azure Cosmos DB for Apache Cassandra kann als Datenspeicher für Apps verwendet werden, die für Apache Cassandra geschrieben wurden. Diese Kompatibilität bedeutet, dass durch Verwendung vorhandener Apache-Treiber, die mit CQLv4 kompatibel sind, Ihre vorhandene Cassandra-Anwendung jetzt mit der API für Cassandra kommunizieren kann.

Weitere Informationen finden Sie unter den folgenden Links.

Screenshot, der zeigt, wie Sie Azure Cosmos DB for Apache Cassandra im Azure-Portal erstellen.

Abrufen von Anmeldeinformationen verwendet sie im Stream-Quellcode:

Screenshot, der zeigt, wie Anmeldeinformationen im Stream-Quellcode abgerufen werden.

Implementierung

Auf einer Ubuntu-VM bereiten wir die Entwicklungsumgebung vor

Klonen-Repository von Azure-Beispielen

Lesen Sie die Readme-Datei auf GitHub, um Maven herunterzuladen, und klonen Sie dieses Repository mit Hilfe von Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git aus Azure-Beispielen.

Aktualisieren des Maven-Projekts für Cassandra

Wechseln Sie zum Maven-Projektordner azure-cosmos-db-cassandra-java-getting-started-main, und aktualisieren Sie die für dieses Beispiel erforderlichen Änderungen.

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.azure.cosmosdb.cassandra</groupId>
  <artifactId>cosmosdb-cassandra-examples</artifactId>
   <version>1.0-SNAPSHOT</version>
  <dependencies>
            <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-files</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-core</artifactId>
          <version>3.3.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-mapping</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-extras</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.5</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.5</version>
       </dependency>
   </dependencies>
 
   <build>
       <plugins>
           <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
               <configuration>
                   <descriptorRefs>
                      <descriptorRef>jar-with-dependencies</descriptorRef>
                   </descriptorRefs>
                  <finalName>cosmosdb-cassandra-examples</finalName>
                  <appendAssemblyId>false</appendAssemblyId>
               </configuration>
               <executions>
                   <execution>
                      <id>make-assembly</id>
                      <phase>package</phase>
                       <goals>
                          <goal>single</goal>
                       </goals>
                   </execution>
               </executions>
           </plugin>
           <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
 
</project>

Cosmos DB für Apache Cassandras Verbindungskonfiguration

Sie müssen Ihren Hostnamen und Benutzernamen sowie Schlüssel im folgenden Codeausschnitt aktualisieren.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>

Quellstruktur

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr  9  2020 ../
-rw-r--r-- 1 root root 1105 Apr  9  2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr  9  2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/

util-Ordner
CassandraUtils.java

Hinweis

Die Änderung des ssl_keystore_file_path hängt von der Position des Java-Zertifikats ab. Apache Flink-Cluster in HDInsight on AKS (Pfad: /usr/lib/jvm/msopenjdk-11-jre/lib/security)

package com.azure.cosmosdb.cassandra.util;

import com.datastax.driver.core.*;

import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;

/**
 * Cassandra utility class to handle the Cassandra Sessions
 */
public class CassandraUtils {

    private Cluster cluster;
    private Configurations config = new Configurations();
    private String cassandraHost = "<cassandra-host-ip>";
    private int cassandraPort = 10350;
    private String cassandraUsername = "localhost";
    private String cassandraPassword = "<cassandra-password>";
    private File sslKeyStoreFile = null;
    private String sslKeyStorePassword = "<keystore-password>";


    /**
     * This method creates a Cassandra Session based on the end-point details given in config.properties.
     * This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
     * If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
     * @return Session Cassandra Session
     */
    public Session getSession() {

        try {
            //Load cassandra endpoint details from config.properties
            loadCassandraConnectionDetails();

            final KeyStore keyStore = KeyStore.getInstance("JKS");
            try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
                keyStore.load(is, sslKeyStorePassword.toCharArray());
            }

            final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                    .getDefaultAlgorithm());
            kmf.init(keyStore, sslKeyStorePassword.toCharArray());
            final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
                    .getDefaultAlgorithm());
            tmf.init(keyStore);

            // Creates a socket factory for HttpsURLConnection using JKS contents.
            final SSLContext sc = SSLContext.getInstance("TLSv1.2");
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());

            JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
                    .withSSLContext(sc)
                    .build();
            cluster = Cluster.builder()
                    .addContactPoint(cassandraHost)
                    .withPort(cassandraPort)
                    .withCredentials(cassandraUsername, cassandraPassword)
                    .withSSL(sslOptions)
                    .build();

            return cluster.connect();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return null;
    }

    public Cluster getCluster() {
        return cluster;
    }

    /**
     * Closes the cluster and Cassandra session
     */
    public void close() {
        cluster.close();
    }

    /**
     * Loads Cassandra end-point details from config.properties.
     * @throws Exception
     */
    private void loadCassandraConnectionDetails() throws Exception {
        cassandraHost = config.getProperty("cassandra_host");
        cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
        cassandraUsername = config.getProperty("cassandra_username");
        cassandraPassword = config.getProperty("cassandra_password");
        String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
        String ssl_keystore_password = config.getProperty("ssl_keystore_password");

        // If ssl_keystore_file_path, build the path using JAVA_HOME directory.
        if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
            String javaHomeDirectory = System.getenv("JAVA_HOME");
            if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
                throw new Exception("JAVA_HOME not set");
            }
            ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
        }

        sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
                ssl_keystore_password : sslKeyStorePassword;

        sslKeyStoreFile = new File(ssl_keystore_file_path);

        if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
            throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
        }
    }
}

Configurations.java

package com.azure.cosmosdb.cassandra.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Configuration utility to read the configurations from properties file
 */
public class Configurations {
    private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
    private static String PROPERTY_FILE = "config.properties";
    private static Properties prop = null;

    private void loadProperties() throws IOException {
        InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
        if (input == null) {
            LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
            return;
        }
        prop = new Properties();
        prop.load(input);
    }

    public String getProperty(String propertyName) throws IOException {
        if (prop == null) {
            loadProperties();
        }
        return prop.getProperty(propertyName);

    }
}

Ordner für Beispiele

CassandraSink.java

package com.azure.cosmosdb.cassandra.examples;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;

public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {

            CassandraUtils utils = new CassandraUtils();
            Session cassandraSession = utils.getSession();
            try {
                UserRepository repository = new UserRepository(cassandraSession);

                //Insert rows into user table
                PreparedStatement preparedStatement = repository.prepareInsertStatement();
                repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);

                } finally {
                        if (null != utils) utils.close();
                        if (null != cassandraSession) cassandraSession.close();
                        }
            }
}

Hauptklasse: CassandraDemo.java

Hinweis

  • Ersetzen von Kafka-Broker-IPs durch Ihre Kafka-Clusterbroker-IPs
  • Vorbereiten des Themas
    • Benutzer /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
package com.azure.cosmosdb.cassandra.examples;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CassandraDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        // 1. read kafka message as stream input, update the broker IPs from your Kafka setup
        String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("user")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        kafka.print();

        DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
            String[] fields = line.split(",");
            int v1 = Integer.parseInt(fields[0]);
            Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
            return tuple3;
        }).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));


        dataStream.addSink(new CassandraSink());

        // 4. run stream
        env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
    }
}

Erstellen des Projekts

Führen Sie mvn clean install aus dem Ordner azure-cosmos-db-cassandra-java-getting-started-main aus, um das Projekt zu erstellen. Dieser Befehl erzeugt cosmosdb-cassandra-examples.jar im Zielordner.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root     4096 May 15 03:54 ./
drwxr-xr-x 7 root root     4096 May 15 03:54 ../
drwxr-xr-x 2 root root     4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root     4096 May 15 03:54 classes/
-rw-r--r-- 1 root root    15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root     4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root     4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root     4096 May 15 03:54 maven-status/

Hochladen der Jar-Datei in den Azure-Speicher und wget in webssh

msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar

Vorbereiten von Cosmos DB KeyStore und Tabelle

Führen Sie die Klasse UserProfile in /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples aus, um den Keystore und die Tabelle von Azure Cosmos DB zu erstellen.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar

Speichern von Kafka-Themen in Cosmos DB for Apache Cassandra

Führen Sie die Klasse CassandraDemo aus, um Kafka-Themen in Cosmos DB for Apache Cassandra zu speichern.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar

Screenshot, der zeigt, wie Sie CassandraDemo ausführen.

Überprüfen eines Auftrags auf der Flink-Webbenutzeroberfläche in einem HDInsight auf AKS-Cluster.

Screenshot, der zeigt, wie Sie den Auftrag auf der Flink-Benutzeroberfläche von HDInsight auf AKS überprüfen.

Erzeugen von Nachrichten in Kafka

Erzeugen einer Nachricht in einem Kafka-Thema.

sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random

user_set = [
        'John',
        'Mike',
        'Lucy',
        'Tom',
        'Machael',
        'Lily',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

city_set = [
        'Atmore',
        'Auburn',
        'Bessemer',
        'Birmingham',
        'Chickasaw',
        'Clanton',
        'Decatur',
        'Florence',
        'Greenville',
        'Jasper',
        'Huntsville',
        'Homer',
        'Homer'
]

def main():
        while True:
                unique_id = str(int(time.time()))
                if random.randrange(10) < 4:
                        city = random.choice(city_set[:3])
                else:
                        city = random.choice(city_set)
                user = random.choice(user_set)
                print(unique_id + "," + user + "," + city )
                time.sleep(1)

if __name__ == "__main__":
    main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516

Überprüfung der Tabelle auf Cosmos DB für Apache Cassandra auf dem Azure-Portal

Screenshot von Cosmos DB for Apache Cassandra im Azure-Portal.

Einstellungen