Enviar mensagens do Apache Kafka® para o Azure Cosmos DB para Apache Cassandra, utilizando Apache Flink® no HDInsight no AKS.
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre a Comunidade do Azure HDInsight .
Este exemplo usa do Apache Flink para afundar HDInsight para mensagens de do Apache Kafka no ado Azure Cosmos DB para Apache Cassandra.
Este exemplo é proeminente quando os engenheiros preferem dados agregados em tempo real para análise. Com acesso a dados agregados históricos, você pode criar modelos de ML (machine learning) para criar insights ou ações. Você também pode ingerir dados de IoT no Apache Flink para agregar dados em tempo real e armazená-los no Apache Cassandra.
Pré-requisitos
- Apache Flink 1.17.0 no HDInsight no AKS
- Apache Kafka 3.2 no HDInsight
- do Azure Cosmos DB para Apache Cassandra
- Uma VM do Ubuntu para ambiente de desenvolvimento de projetos maven na mesma VNet do HDInsight no cluster do AKS.
Azure Cosmos DB para Apache Cassandra
O Azure Cosmos DB para Apache Cassandra pode ser usado como o repositório de dados para aplicativos gravados para Apache Cassandra. Essa compatibilidade significa que, usando drivers Apache existentes em conformidade com o CQLv4, seu aplicativo Cassandra existente agora pode se comunicar com a API do Cassandra.
Para obter mais informações, consulte os links a seguir.
Obtenha credenciais e use-as no código-fonte do Stream.
Implementação
Em uma VM do Ubuntu, vamos preparar o ambiente de desenvolvimento
Clonando o repositório de exemplos do Azure
Consulte o readme do GitHub para baixar o maven e clone este repositório usando Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git
de Exemplos do Azure .
Atualizando o projeto maven para Cassandra
Vá para a pasta do projeto maven azure-cosmos-db-cassandra-java-getting-started-main e atualize as alterações necessárias para este exemplo.
maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.azure.cosmosdb.cassandra</groupId>
<artifactId>cosmosdb-cassandra-examples</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>cosmosdb-cassandra-examples</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Cosmos DB para a configuração de conexão do Apache Cassandra
Você precisa atualizar o nome do host, o nome de usuário e as chaves no snippet abaixo.
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>
estrutura de origem
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr 9 2020 ../
-rw-r--r-- 1 root root 1105 Apr 9 2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr 9 2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/
pasta util
CassandraUtils.java
Nota
A alteração do caminho ssl_keystore_file_path depende do local do certificado do Java. Cluster do Apache Flink no HDInsight no AKS, o caminho é /usr/lib/jvm/msopenjdk-11-jre/lib/security
package com.azure.cosmosdb.cassandra.util;
import com.datastax.driver.core.*;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;
/**
* Cassandra utility class to handle the Cassandra Sessions
*/
public class CassandraUtils {
private Cluster cluster;
private Configurations config = new Configurations();
private String cassandraHost = "<cassandra-host-ip>";
private int cassandraPort = 10350;
private String cassandraUsername = "localhost";
private String cassandraPassword = "<cassandra-password>";
private File sslKeyStoreFile = null;
private String sslKeyStorePassword = "<keystore-password>";
/**
* This method creates a Cassandra Session based on the end-point details given in config.properties.
* This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
* If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
* @return Session Cassandra Session
*/
public Session getSession() {
try {
//Load cassandra endpoint details from config.properties
loadCassandraConnectionDetails();
final KeyStore keyStore = KeyStore.getInstance("JKS");
try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
keyStore.load(is, sslKeyStorePassword.toCharArray());
}
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(keyStore, sslKeyStorePassword.toCharArray());
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
.getDefaultAlgorithm());
tmf.init(keyStore);
// Creates a socket factory for HttpsURLConnection using JKS contents.
final SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());
JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(sc)
.build();
cluster = Cluster.builder()
.addContactPoint(cassandraHost)
.withPort(cassandraPort)
.withCredentials(cassandraUsername, cassandraPassword)
.withSSL(sslOptions)
.build();
return cluster.connect();
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
public Cluster getCluster() {
return cluster;
}
/**
* Closes the cluster and Cassandra session
*/
public void close() {
cluster.close();
}
/**
* Loads Cassandra end-point details from config.properties.
* @throws Exception
*/
private void loadCassandraConnectionDetails() throws Exception {
cassandraHost = config.getProperty("cassandra_host");
cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
cassandraUsername = config.getProperty("cassandra_username");
cassandraPassword = config.getProperty("cassandra_password");
String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
String ssl_keystore_password = config.getProperty("ssl_keystore_password");
// If ssl_keystore_file_path, build the path using JAVA_HOME directory.
if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
String javaHomeDirectory = System.getenv("JAVA_HOME");
if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
throw new Exception("JAVA_HOME not set");
}
ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
}
sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
ssl_keystore_password : sslKeyStorePassword;
sslKeyStoreFile = new File(ssl_keystore_file_path);
if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
}
}
}
Configurations.java
package com.azure.cosmosdb.cassandra.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Configuration utility to read the configurations from properties file
*/
public class Configurations {
private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
private static String PROPERTY_FILE = "config.properties";
private static Properties prop = null;
private void loadProperties() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
if (input == null) {
LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
return;
}
prop = new Properties();
prop.load(input);
}
public String getProperty(String propertyName) throws IOException {
if (prop == null) {
loadProperties();
}
return prop.getProperty(propertyName);
}
}
exemplos de pasta
CassandraSink.java
package com.azure.cosmosdb.cassandra.examples;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;
public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {
@Override
public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
CassandraUtils utils = new CassandraUtils();
Session cassandraSession = utils.getSession();
try {
UserRepository repository = new UserRepository(cassandraSession);
//Insert rows into user table
PreparedStatement preparedStatement = repository.prepareInsertStatement();
repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);
} finally {
if (null != utils) utils.close();
if (null != cassandraSession) cassandraSession.close();
}
}
}
classe principal: CassandraDemo.java
Nota
- Substitua os IPs do broker do Kafka pelos IPs dos brokers do seu cluster Kafka.
- Preparar tópico
- usuário
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
- usuário
package com.azure.cosmosdb.cassandra.examples;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CassandraDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 1. read kafka message as stream input, update the broker IPs from your Kafka setup
String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("user")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafka.print();
DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
String[] fields = line.split(",");
int v1 = Integer.parseInt(fields[0]);
Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
return tuple3;
}).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));
dataStream.addSink(new CassandraSink());
// 4. run stream
env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
}
}
Compilando o projeto
Execute mvn clean install a partir da pasta azure-cosmos-db-cassandra-java-getting-started-main para compilar o projeto. Esse comando gera cosmosdb-cassandra-examples.jar na pasta de destino.
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root 4096 May 15 03:54 ./
drwxr-xr-x 7 root root 4096 May 15 03:54 ../
drwxr-xr-x 2 root root 4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root 4096 May 15 03:54 classes/
-rw-r--r-- 1 root root 15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root 4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root 4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root 4096 May 15 03:54 maven-status/
Carregando o jar para submissão de tarefa do Apache Flink
Carregar JAR no armazenamento do Azure e usar wget no webssh
msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar
Preparando o KeyStore e a Tabela do Cosmos DB
Execute a classe UserProfile em /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples para criar o repositório de chaves e a tabela do Azure Cosmos DB.
bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar
Inserir tópicos do Kafka no Cosmos DB para Apache Cassandra
Execute a classe CassandraDemo para transferir o tópico do Kafka para o Cosmos DB no Apache Cassandra.
bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar
Validar submissão de tarefa do Apache Flink
Verifique o trabalho na UI da Web do Flink no HDInsight no cluster AKS.
Produzindo mensagens no Kafka
Produzir mensagem em tópico Kafka.
sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random
user_set = [
'John',
'Mike',
'Lucy',
'Tom',
'Machael',
'Lily',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
city_set = [
'Atmore',
'Auburn',
'Bessemer',
'Birmingham',
'Chickasaw',
'Clanton',
'Decatur',
'Florence',
'Greenville',
'Jasper',
'Huntsville',
'Homer',
'Homer'
]
def main():
while True:
unique_id = str(int(time.time()))
if random.randrange(10) < 4:
city = random.choice(city_set[:3])
else:
city = random.choice(city_set)
user = random.choice(user_set)
print(unique_id + "," + user + "," + city )
time.sleep(1)
if __name__ == "__main__":
main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516
Confira a tabela no Cosmos DB do Apache Cassandra no Azure Portal
Preferências
- Azure Cosmos DB para Apache Cassandra.
- criar uma conta da API para Cassandra no Azure Cosmos DB
- Amostras do Azure
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, Apache Cassandra, Cassandra e nomes de projeto de software livre associados são marcas comerciais do ASF (Apache Software Foundation).