Dela via


Skicka meddelanden från Apache Kafka till Azure Cosmos DB för Apache Cassandra med Apache Flink på HDInsight i AKS

Viktig

Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Läs mer i det här meddelandet.

Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.

Viktig

Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS förhandsversionsinformation. För frågor eller funktionsförslag, skicka en begäran om AskHDInsight med informationen och följ oss för fler uppdateringar om Azure HDInsight Community.

I det här exemplet används Apache Flink för att sänka meddelanden från HDInsight för Apache Kafka till Azure Cosmos DB för Apache Cassandra.

Det här exemplet är framträdande när tekniker föredrar aggregerade data i realtid för analys. Med åtkomst till historiska aggregerade data kan du skapa maskininlärningsmodeller (ML) för att skapa insikter eller åtgärder. Du kan också mata in IoT-data i Apache Flink för att aggregera data i realtid och lagra dem i Apache Cassandra.

Förutsättningar

Azure Cosmos DB för Apache Cassandra

Azure Cosmos DB för Apache Cassandra kan användas som datalager för appar som skrivits för Apache Cassandra. Den här kompatibiliteten innebär att genom att använda befintliga Apache-drivrutiner som är kompatibla med CQLv4 kan ditt befintliga Cassandra-program nu kommunicera med API:et för Cassandra.

Mer information finns i följande länkar.

Skärmbild som visar hur du skapar Azure Cosmos DB för Apache Cassandra på Azure-portalen.

Hämta autentiseringsuppgifter för att använda dem i Stream-källkoden.

Skärmbild som visar hur du hämtar autentiseringsuppgifter för strömmad källkod.

Genomförande

På en virtuell Ubuntu-dator ska vi förbereda utvecklingsmiljön

Klona arkiv av Azure-exempel

Se GitHub-läsme för att ladda ner maven, och klona detta repo med hjälp av Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git från Azure Samples .

Uppdatera maven-projekt för Cassandra

Gå till maven-projektmappen azure-cosmos-db-cassandra-java-getting-started-main och uppdatera de ändringar som krävs för det här exemplet.

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.azure.cosmosdb.cassandra</groupId>
  <artifactId>cosmosdb-cassandra-examples</artifactId>
   <version>1.0-SNAPSHOT</version>
  <dependencies>
            <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-files</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-core</artifactId>
          <version>3.3.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-mapping</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-extras</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.5</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.5</version>
       </dependency>
   </dependencies>
 
   <build>
       <plugins>
           <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
               <configuration>
                   <descriptorRefs>
                      <descriptorRef>jar-with-dependencies</descriptorRef>
                   </descriptorRefs>
                  <finalName>cosmosdb-cassandra-examples</finalName>
                  <appendAssemblyId>false</appendAssemblyId>
               </configuration>
               <executions>
                   <execution>
                      <id>make-assembly</id>
                      <phase>package</phase>
                       <goals>
                          <goal>single</goal>
                       </goals>
                   </execution>
               </executions>
           </plugin>
           <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
 
</project>

Cosmos DB för Apache Cassandras anslutningskonfiguration

Du måste uppdatera värdnamnet och användarnamnet och nycklarna i kodfragmentet nedan.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>

källstruktur

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr  9  2020 ../
-rw-r--r-- 1 root root 1105 Apr  9  2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr  9  2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/

util-mapp
CassandraUtils.java

Anteckning

Ändra ssl_keystore_file_path beror på java-certifikatplatsen. Apache Flink-kluster på HDInsight och AKS, sökvägen är /usr/lib/jvm/msopenjdk-11-jre/lib/security

package com.azure.cosmosdb.cassandra.util;

import com.datastax.driver.core.*;

import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;

/**
 * Cassandra utility class to handle the Cassandra Sessions
 */
public class CassandraUtils {

    private Cluster cluster;
    private Configurations config = new Configurations();
    private String cassandraHost = "<cassandra-host-ip>";
    private int cassandraPort = 10350;
    private String cassandraUsername = "localhost";
    private String cassandraPassword = "<cassandra-password>";
    private File sslKeyStoreFile = null;
    private String sslKeyStorePassword = "<keystore-password>";


    /**
     * This method creates a Cassandra Session based on the end-point details given in config.properties.
     * This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
     * If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
     * @return Session Cassandra Session
     */
    public Session getSession() {

        try {
            //Load cassandra endpoint details from config.properties
            loadCassandraConnectionDetails();

            final KeyStore keyStore = KeyStore.getInstance("JKS");
            try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
                keyStore.load(is, sslKeyStorePassword.toCharArray());
            }

            final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                    .getDefaultAlgorithm());
            kmf.init(keyStore, sslKeyStorePassword.toCharArray());
            final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
                    .getDefaultAlgorithm());
            tmf.init(keyStore);

            // Creates a socket factory for HttpsURLConnection using JKS contents.
            final SSLContext sc = SSLContext.getInstance("TLSv1.2");
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());

            JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
                    .withSSLContext(sc)
                    .build();
            cluster = Cluster.builder()
                    .addContactPoint(cassandraHost)
                    .withPort(cassandraPort)
                    .withCredentials(cassandraUsername, cassandraPassword)
                    .withSSL(sslOptions)
                    .build();

            return cluster.connect();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return null;
    }

    public Cluster getCluster() {
        return cluster;
    }

    /**
     * Closes the cluster and Cassandra session
     */
    public void close() {
        cluster.close();
    }

    /**
     * Loads Cassandra end-point details from config.properties.
     * @throws Exception
     */
    private void loadCassandraConnectionDetails() throws Exception {
        cassandraHost = config.getProperty("cassandra_host");
        cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
        cassandraUsername = config.getProperty("cassandra_username");
        cassandraPassword = config.getProperty("cassandra_password");
        String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
        String ssl_keystore_password = config.getProperty("ssl_keystore_password");

        // If ssl_keystore_file_path, build the path using JAVA_HOME directory.
        if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
            String javaHomeDirectory = System.getenv("JAVA_HOME");
            if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
                throw new Exception("JAVA_HOME not set");
            }
            ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
        }

        sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
                ssl_keystore_password : sslKeyStorePassword;

        sslKeyStoreFile = new File(ssl_keystore_file_path);

        if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
            throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
        }
    }
}

Configurations.java

package com.azure.cosmosdb.cassandra.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Configuration utility to read the configurations from properties file
 */
public class Configurations {
    private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
    private static String PROPERTY_FILE = "config.properties";
    private static Properties prop = null;

    private void loadProperties() throws IOException {
        InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
        if (input == null) {
            LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
            return;
        }
        prop = new Properties();
        prop.load(input);
    }

    public String getProperty(String propertyName) throws IOException {
        if (prop == null) {
            loadProperties();
        }
        return prop.getProperty(propertyName);

    }
}

Exempelmapp

CassandraSink.java

package com.azure.cosmosdb.cassandra.examples;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;

public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {

            CassandraUtils utils = new CassandraUtils();
            Session cassandraSession = utils.getSession();
            try {
                UserRepository repository = new UserRepository(cassandraSession);

                //Insert rows into user table
                PreparedStatement preparedStatement = repository.prepareInsertStatement();
                repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);

                } finally {
                        if (null != utils) utils.close();
                        if (null != cassandraSession) cassandraSession.close();
                        }
            }
}

huvudklass: CassandraDemo.java

Not

  • Ersätt Kafka Broker IP-adresser med dina Kafka-klusterkoordinator-IP-adresser
  • Förbereda ämne
    • användare /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
package com.azure.cosmosdb.cassandra.examples;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CassandraDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        // 1. read kafka message as stream input, update the broker IPs from your Kafka setup
        String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("user")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        kafka.print();

        DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
            String[] fields = line.split(",");
            int v1 = Integer.parseInt(fields[0]);
            Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
            return tuple3;
        }).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));


        dataStream.addSink(new CassandraSink());

        // 4. run stream
        env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
    }
}

Skapa projektet

Kör mvn clean install från mappen azure-cosmos-db-cassandra-java-getting-started-main för att skapa projektet. Det här kommandot genererar cosmosdb-cassandra-examples.jar under målmappen.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root     4096 May 15 03:54 ./
drwxr-xr-x 7 root root     4096 May 15 03:54 ../
drwxr-xr-x 2 root root     4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root     4096 May 15 03:54 classes/
-rw-r--r-- 1 root root    15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root     4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root     4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root     4096 May 15 03:54 maven-status/

Ladda upp jar till Azure Storage och wget till webssh

msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar

Förbereda Cosmos DB KeyStore och tabell

Kör UserProfile-klassen i /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples för att skapa Azure Cosmos DB:s nyckelarkiv och tabell.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar

Sink Kafka Topics i Cosmos DB för Apache Cassandra

Kör CassandraDemo-klassen för att överföra Kafka-topiken till Cosmos DB för Apache Cassandra.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar

Skärmbild som visar hur du kör Cassandra Demo.

Kontrollera jobbet i Flink-webbgränssnittet i HDInsight på AKS-kluster.

Skärmbild som visar hur du kontrollerar jobbet i HDInsight i AKS Flink-användargränssnittet.

Skapa meddelanden i Kafka

Producera ett meddelande i Kafka-ämnet.

sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random

user_set = [
        'John',
        'Mike',
        'Lucy',
        'Tom',
        'Machael',
        'Lily',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

city_set = [
        'Atmore',
        'Auburn',
        'Bessemer',
        'Birmingham',
        'Chickasaw',
        'Clanton',
        'Decatur',
        'Florence',
        'Greenville',
        'Jasper',
        'Huntsville',
        'Homer',
        'Homer'
]

def main():
        while True:
                unique_id = str(int(time.time()))
                if random.randrange(10) < 4:
                        city = random.choice(city_set[:3])
                else:
                        city = random.choice(city_set)
                user = random.choice(user_set)
                print(unique_id + "," + user + "," + city )
                time.sleep(1)

if __name__ == "__main__":
    main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516

Kontrollera tabellen i Cosmos DB för Apache Cassandra på Azure-portalen

Skärmbild som visar Cosmos DB för Apache Cassandra på Azure-portalen.

Inställningar