Przekazywanie komunikatów z platformy Apache Kafka® do bazy danych Azure Cosmos DB dla Apache Cassandra za pomocą Apache Flink® na HDInsight w AKS.
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej w tym ogłoszeniu.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla wersji zapoznawczych usługi Microsoft Azure zawierają więcej warunków prawnych, które dotyczą funkcji usługi Azure w wersji beta, w wersji zapoznawczej lub w inny sposób jeszcze nieudostępnione do ogólnej dostępności. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS . W przypadku pytań lub sugestii dotyczących funkcji, prześlij żądanie dotyczące AskHDInsight z detalami i śledź nas, aby uzyskać więcej aktualizacji na Społeczność Azure HDInsight.
W tym przykładzie użyto Apache Flink do ujścia komunikatów HDInsight dla platformy Apache Kafka do usługi Azure Cosmos DB for Apache Cassandra.
Ten przykład jest widoczny, gdy inżynierowie preferują dane zagregowane w czasie rzeczywistym na potrzeby analizy. Dzięki dostępowi do historycznych zagregowanych danych można tworzyć modele uczenia maszynowego w celu tworzenia szczegółowych informacji lub akcji. Możesz również pozyskiwać dane IoT do platformy Apache Flink, aby agregować dane w czasie rzeczywistym i przechowywać je w usłudze Apache Cassandra.
Warunki wstępne
- Apache Flink 1.17.0 w usłudze HDInsight w usłudze AKS
- Apache Kafka 3.2 w usłudze HDInsight
- Azure Cosmos DB dla Apache Cassandra
- VM z Ubuntu dla środowiska deweloperskiego projektów maven w tej samej sieci VNet co HDInsight w klastrze AKS.
Usługa Azure Cosmos DB dla bazy danych Apache Cassandra
Usługa Azure Cosmos DB dla systemu Apache Cassandra może służyć jako magazyn danych dla aplikacji napisanych dla systemu Apache Cassandra. Ta zgodność oznacza, że przy użyciu istniejących sterowników Apache zgodnych z CQLv4 istniejąca aplikacja Cassandra może teraz komunikować się z interfejsem API dla bazy danych Cassandra.
Aby uzyskać więcej informacji, zobacz następujące linki.
- Azure Cosmos DB dla Apache Cassandra.
- Stworzyć interfejs API dla konta Cassandra w usłudze Azure Cosmos DB.
Uzyskiwanie poświadczeń używa go w kodzie źródłowym usługi Stream:
Implementacja
na maszynie wirtualnej z systemem Ubuntu przygotujemy środowisko deweloperskie
Klonowanie repozytorium przykładów platformy Azure
Zapoznaj się z plikiem README na GitHubie, aby pobrać narzędzie Maven i sklonować to repozytorium przy użyciu Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git
z Azure Samples .
Aktualizowanie projektu maven dla bazy danych Cassandra
Przejdź do folderu projektu maven azure-cosmos-db-cassandra-java-getting-started-main i zaktualizuj zmiany wymagane w tym przykładzie.
ekspert pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.azure.cosmosdb.cassandra</groupId>
<artifactId>cosmosdb-cassandra-examples</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>cosmosdb-cassandra-examples</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Konfiguracja połączenia Cosmos DB dla Apache Cassandra
Musisz zaktualizować nazwę hosta i nazwę użytkownika oraz klucze w poniższym fragmencie kodu.
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>
struktury źródłowej
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr 9 2020 ../
-rw-r--r-- 1 root root 1105 Apr 9 2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr 9 2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/
folder util
CassandraUtils.java
Notatka
Zmiana ssl_keystore_file_path zależy od lokalizacji certyfikatu java. Klaster Apache Flink w usłudze HDInsight w usłudze AKS— ścieżka jest /usr/lib/jvm/msopenjdk-11-jre/lib/security
package com.azure.cosmosdb.cassandra.util;
import com.datastax.driver.core.*;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;
/**
* Cassandra utility class to handle the Cassandra Sessions
*/
public class CassandraUtils {
private Cluster cluster;
private Configurations config = new Configurations();
private String cassandraHost = "<cassandra-host-ip>";
private int cassandraPort = 10350;
private String cassandraUsername = "localhost";
private String cassandraPassword = "<cassandra-password>";
private File sslKeyStoreFile = null;
private String sslKeyStorePassword = "<keystore-password>";
/**
* This method creates a Cassandra Session based on the end-point details given in config.properties.
* This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
* If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
* @return Session Cassandra Session
*/
public Session getSession() {
try {
//Load cassandra endpoint details from config.properties
loadCassandraConnectionDetails();
final KeyStore keyStore = KeyStore.getInstance("JKS");
try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
keyStore.load(is, sslKeyStorePassword.toCharArray());
}
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(keyStore, sslKeyStorePassword.toCharArray());
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
.getDefaultAlgorithm());
tmf.init(keyStore);
// Creates a socket factory for HttpsURLConnection using JKS contents.
final SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());
JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(sc)
.build();
cluster = Cluster.builder()
.addContactPoint(cassandraHost)
.withPort(cassandraPort)
.withCredentials(cassandraUsername, cassandraPassword)
.withSSL(sslOptions)
.build();
return cluster.connect();
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
public Cluster getCluster() {
return cluster;
}
/**
* Closes the cluster and Cassandra session
*/
public void close() {
cluster.close();
}
/**
* Loads Cassandra end-point details from config.properties.
* @throws Exception
*/
private void loadCassandraConnectionDetails() throws Exception {
cassandraHost = config.getProperty("cassandra_host");
cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
cassandraUsername = config.getProperty("cassandra_username");
cassandraPassword = config.getProperty("cassandra_password");
String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
String ssl_keystore_password = config.getProperty("ssl_keystore_password");
// If ssl_keystore_file_path, build the path using JAVA_HOME directory.
if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
String javaHomeDirectory = System.getenv("JAVA_HOME");
if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
throw new Exception("JAVA_HOME not set");
}
ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
}
sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
ssl_keystore_password : sslKeyStorePassword;
sslKeyStoreFile = new File(ssl_keystore_file_path);
if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
}
}
}
Configurations.java
package com.azure.cosmosdb.cassandra.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Configuration utility to read the configurations from properties file
*/
public class Configurations {
private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
private static String PROPERTY_FILE = "config.properties";
private static Properties prop = null;
private void loadProperties() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
if (input == null) {
LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
return;
}
prop = new Properties();
prop.load(input);
}
public String getProperty(String propertyName) throws IOException {
if (prop == null) {
loadProperties();
}
return prop.getProperty(propertyName);
}
}
Przykłady folderu
CassandraSink.java
package com.azure.cosmosdb.cassandra.examples;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;
public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {
@Override
public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
CassandraUtils utils = new CassandraUtils();
Session cassandraSession = utils.getSession();
try {
UserRepository repository = new UserRepository(cassandraSession);
//Insert rows into user table
PreparedStatement preparedStatement = repository.prepareInsertStatement();
repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);
} finally {
if (null != utils) utils.close();
if (null != cassandraSession) cassandraSession.close();
}
}
}
klasa główna : CassandraDemo.java
Notatka
- Zamień adresy IP brokera platformy Kafka na adresy IP brokera klastra platformy Kafka
- Przygotowywanie tematu
- użytkownik
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
- użytkownik
package com.azure.cosmosdb.cassandra.examples;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CassandraDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 1. read kafka message as stream input, update the broker IPs from your Kafka setup
String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("user")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafka.print();
DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
String[] fields = line.split(",");
int v1 = Integer.parseInt(fields[0]);
Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
return tuple3;
}).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));
dataStream.addSink(new CassandraSink());
// 4. run stream
env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
}
}
Budowanie projektu
Uruchom mvn clean install z folderu azure-cosmos-db-cassandra-java-getting-started-main, aby skompilować projekt. To polecenie generuje cosmosdb-cassandra-examples.jar w folderze docelowym.
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root 4096 May 15 03:54 ./
drwxr-xr-x 7 root root 4096 May 15 03:54 ../
drwxr-xr-x 2 root root 4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root 4096 May 15 03:54 classes/
-rw-r--r-- 1 root root 15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root 4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root 4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root 4096 May 15 03:54 maven-status/
Ładowanie pliku jar dla wysyłki zadania Apache Flink
Przekaż plik jar do usługi Azure Storage i użyj wget w webssh
msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar
Przygotowywanie sklepu kluczy i tabeli Cosmos DB
Uruchom klasę UserProfile w folderze /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples, aby utworzyć magazyn kluczy i tabelę usługi Azure Cosmos DB.
bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar
Przesyłanie tematów z Kafka do Cosmos DB dla Apache Cassandra
Uruchom klasę CassandraDemo, aby przesłać temat Kafka do usługi Cosmos DB dla Apache Cassandra.
bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar
Walidacja przesyłania zadania w Apache Flink
Sprawdź zadanie w interfejsie Flink Web UI na HDInsight na klastrze AKS.
Tworzenie komunikatów na platformie Kafka
Tworzenie komunikatu w temacie platformy Kafka.
sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random
user_set = [
'John',
'Mike',
'Lucy',
'Tom',
'Machael',
'Lily',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
city_set = [
'Atmore',
'Auburn',
'Bessemer',
'Birmingham',
'Chickasaw',
'Clanton',
'Decatur',
'Florence',
'Greenville',
'Jasper',
'Huntsville',
'Homer',
'Homer'
]
def main():
while True:
unique_id = str(int(time.time()))
if random.randrange(10) < 4:
city = random.choice(city_set[:3])
else:
city = random.choice(city_set)
user = random.choice(user_set)
print(unique_id + "," + user + "," + city )
time.sleep(1)
if __name__ == "__main__":
main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516
Sprawdź tabelę w usłudze Cosmos DB dla Apache Cassandra na portalu Azure
Preferencje
- Azure Cosmos DB dla Apache Cassandra.
- Tworzenie API dla konta Cassandra w usłudze Azure Cosmos DB
- przykładów platformy Azure
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, Apache Cassandra, Cassandra i skojarzone nazwy projektów typu open source są znakami towarowymiFundacji Apache Software Foundation (ASF).