使用部署在 AKS 上的 HDInsight 上的 Apache Flink®,將 Apache Kafka® 訊息匯入至適用於 Apache Cassandra 的 Azure Cosmos DB
此範例會使用 Apache Flink,將適用於 Apache Kafka 的 HDInsight HDInsight 訊息接收至適用於 Apache Cassandra 的 Azure Cosmos DB。
當工程師偏好實時匯總數據進行分析時,此範例十分突出。 透過存取歷史匯總數據,您可以建置機器學習服務 (ML) 模型來建置深入解析或動作。 您也可以將IoT資料內嵌至Apache Flink,以即時匯總數據,並將其儲存在Apache Cassandra 中。
- 在 AKS 的 HDInsight 上運行的 Apache Flink 1.17.0
- 在 HDInsight 上的 Apache Kafka 3.2
- Azure Cosmos DB for Apache Cassandra
- 在與 AKS 叢集上的 HDInsight 相同的 VNet 中,用於 maven 專案開發環境的 Ubuntu 虛擬機器。
適用於 Apache Cassandra 的 Azure Cosmos DB
適用於 Apache Cassandra 的 Azure Cosmos DB 可作為針對 Apache Cassandra 所撰寫應用程式的數據存放區。 此相容性表示,藉由使用符合 CQLv4 的現有 Apache 驅動程式,您現有的 Cassandra 應用程式現在可以與適用於 Cassandra 的 API 通訊。
- 適用於 Apache Cassandra 的 Azure Cosmos DB。
- 在 Azure Cosmos DB中建立 Cassandra 帳戶的 API。
取得認證會在 Stream 原始程式碼上使用:
在Ubuntu VM上,讓我們準備開發環境
複製 Azure 範例的存放庫
請參閱 GitHub 自述檔以下載 maven,並使用 Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git
來自 Azure 範例 複製此存放庫。
更新 Cassandra 的 Maven 專案
移至 maven 項目資料夾 azure-cosmos-db-cassandra-java-getting-started-main,並更新此範例所需的變更。
專家 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
適用於 Apache Cassandra 連線設定的 Cosmos DB
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_port = 10350
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr 9 2020 ../
-rw-r--r-- 1 root root 1105 Apr 9 2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr 9 2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/
util 資料夾
變更ssl_keystore_file_path取決於java憑證位置。 在 AKS 上的 HDInsight 上運行的 Apache Flink 叢集,其路徑為 /usr/lib/jvm/msopenjdk-11-jre/lib/security
package com.azure.cosmosdb.cassandra.util;
import com.datastax.driver.core.*;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;
* Cassandra utility class to handle the Cassandra Sessions
public class CassandraUtils {
private Cluster cluster;
private Configurations config = new Configurations();
private String cassandraHost = "<cassandra-host-ip>";
private int cassandraPort = 10350;
private String cassandraUsername = "localhost";
private String cassandraPassword = "<cassandra-password>";
private File sslKeyStoreFile = null;
private String sslKeyStorePassword = "<keystore-password>";
* This method creates a Cassandra Session based on the end-point details given in config.properties.
* This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
* If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
* @return Session Cassandra Session
public Session getSession() {
try {
//Load cassandra endpoint details from config.properties
final KeyStore keyStore = KeyStore.getInstance("JKS");
try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
keyStore.load(is, sslKeyStorePassword.toCharArray());
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
kmf.init(keyStore, sslKeyStorePassword.toCharArray());
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
// Creates a socket factory for HttpsURLConnection using JKS contents.
final SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());
JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
cluster = Cluster.builder()
.withCredentials(cassandraUsername, cassandraPassword)
return cluster.connect();
} catch (Exception ex) {
return null;
public Cluster getCluster() {
return cluster;
* Closes the cluster and Cassandra session
public void close() {
* Loads Cassandra end-point details from config.properties.
* @throws Exception
private void loadCassandraConnectionDetails() throws Exception {
cassandraHost = config.getProperty("cassandra_host");
cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
cassandraUsername = config.getProperty("cassandra_username");
cassandraPassword = config.getProperty("cassandra_password");
String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
String ssl_keystore_password = config.getProperty("ssl_keystore_password");
// If ssl_keystore_file_path, build the path using JAVA_HOME directory.
if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
String javaHomeDirectory = System.getenv("JAVA_HOME");
if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
throw new Exception("JAVA_HOME not set");
ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
ssl_keystore_password : sslKeyStorePassword;
sslKeyStoreFile = new File(ssl_keystore_file_path);
if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
package com.azure.cosmosdb.cassandra.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
* Configuration utility to read the configurations from properties file
public class Configurations {
private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
private static String PROPERTY_FILE = "config.properties";
private static Properties prop = null;
private void loadProperties() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
if (input == null) {
LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
prop = new Properties();
public String getProperty(String propertyName) throws IOException {
if (prop == null) {
return prop.getProperty(propertyName);
package com.azure.cosmosdb.cassandra.examples;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;
public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {
public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
CassandraUtils utils = new CassandraUtils();
Session cassandraSession = utils.getSession();
try {
UserRepository repository = new UserRepository(cassandraSession);
//Insert rows into user table
PreparedStatement preparedStatement = repository.prepareInsertStatement();
repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);
} finally {
if (null != utils) utils.close();
if (null != cassandraSession) cassandraSession.close();
主要類別 :CassandraDemo.java
- 將 Kafka Broker IP 替換為您的 Kafka 叢集 broker IP
- 準備主題
- 使用者
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
- 使用者
package com.azure.cosmosdb.cassandra.examples;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CassandraDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 1. read kafka message as stream input, update the broker IPs from your Kafka setup
String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setValueOnlyDeserializer(new SimpleStringSchema())
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
String[] fields = line.split(",");
int v1 = Integer.parseInt(fields[0]);
Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
return tuple3;
dataStream.addSink(new CassandraSink());
// 4. run stream
env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
從 azure-cosmos-db-cassandra-java-getting-started-main 資料夾中執行 mvn clean install 以建置專案。 此命令會在目標資料夾下產生cosmosdb-cassandra-examples.jar。
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root 4096 May 15 03:54 ./
drwxr-xr-x 7 root root 4096 May 15 03:54 ../
drwxr-xr-x 2 root root 4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root 4096 May 15 03:54 classes/
-rw-r--r-- 1 root root 15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root 4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root 4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root 4096 May 15 03:54 maven-status/
用於 Apache Flink 作業提交的 jar 上傳
將 jar 上傳至 Azure 記憶體,並將 wget 上傳至 webssh
msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar
準備 Cosmos DB 金鑰庫和資料表
在 /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples 中執行 UserProfile 類別,以建立 Azure Cosmos DB 的密鑰存放區和數據表。
bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar
將 Kafka 主題匯入至適用於 Apache Cassandra 的 Cosmos DB
執行 CassandraDemo 類別,將 Kafka 主題接收至適用於 Apache Cassandra 的 Cosmos DB。
bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar
驗證 Apache Flink 作業提交
在 AKS 叢集的 HDInsight 中,檢查 Flink Web UI 的工作。
在 Kafka 中產生訊息
將訊息發布到 Kafka 主題。
sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random
user_set = [
city_set = [
def main():
while True:
unique_id = str(int(time.time()))
if random.randrange(10) < 4:
city = random.choice(city_set[:3])
city = random.choice(city_set)
user = random.choice(user_set)
print(unique_id + "," + user + "," + city )
if __name__ == "__main__":
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516
查看 Azure 入口網站上 Cosmos DB 的 Apache Cassandra 資料表
- Azure 範例
