共用方式為


使用部署在 AKS 上的 HDInsight 上的 Apache Flink®,將 Apache Kafka® 訊息匯入至適用於 Apache Cassandra 的 Azure Cosmos DB

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 預覽資訊上的 Azure HDInsight。 若有疑問或功能建議,請在 AskHDInsight 提交請求,並關注我們以獲得 Azure HDInsight 社群 最新更新。

此範例會使用 Apache Flink,將適用於 Apache Kafka 的 HDInsight HDInsight 訊息接收至適用於 Apache Cassandra 的 Azure Cosmos DB

當工程師偏好實時匯總數據進行分析時,此範例十分突出。 透過存取歷史匯總數據,您可以建置機器學習服務 (ML) 模型來建置深入解析或動作。 您也可以將IoT資料內嵌至Apache Flink,以即時匯總數據,並將其儲存在Apache Cassandra 中。

先決條件

適用於 Apache Cassandra 的 Azure Cosmos DB

適用於 Apache Cassandra 的 Azure Cosmos DB 可作為針對 Apache Cassandra 所撰寫應用程式的數據存放區。 此相容性表示,藉由使用符合 CQLv4 的現有 Apache 驅動程式,您現有的 Cassandra 應用程式現在可以與適用於 Cassandra 的 API 通訊。

如需詳細資訊,請參閱下列連結。

螢幕快照,顯示如何在 Azure 入口網站上建立適用於 Apache Cassandra 的 Azure Cosmos DB。

取得認證會在 Stream 原始程式碼上使用:

顯示如何在數據流原始程式碼上取得認證的螢幕快照。

實施

在Ubuntu VM上,讓我們準備開發環境

複製 Azure 範例的存放庫

請參閱 GitHub 自述檔以下載 maven,並使用 Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git 來自 Azure 範例 複製此存放庫。

更新 Cassandra 的 Maven 專案

移至 maven 項目資料夾 azure-cosmos-db-cassandra-java-getting-started-main,並更新此範例所需的變更。

專家 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.azure.cosmosdb.cassandra</groupId>
  <artifactId>cosmosdb-cassandra-examples</artifactId>
   <version>1.0-SNAPSHOT</version>
  <dependencies>
            <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-files</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-core</artifactId>
          <version>3.3.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-mapping</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-extras</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.5</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.5</version>
       </dependency>
   </dependencies>
 
   <build>
       <plugins>
           <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
               <configuration>
                   <descriptorRefs>
                      <descriptorRef>jar-with-dependencies</descriptorRef>
                   </descriptorRefs>
                  <finalName>cosmosdb-cassandra-examples</finalName>
                  <appendAssemblyId>false</appendAssemblyId>
               </configuration>
               <executions>
                   <execution>
                      <id>make-assembly</id>
                      <phase>package</phase>
                       <goals>
                          <goal>single</goal>
                       </goals>
                   </execution>
               </executions>
           </plugin>
           <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
 
</project>

適用於 Apache Cassandra 連線設定的 Cosmos DB

您必須更新主機名稱和用戶名稱,以及下面程式碼片段中的金鑰。

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>

來源結構

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr  9  2020 ../
-rw-r--r-- 1 root root 1105 Apr  9  2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr  9  2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/

util 資料夾
CassandraUtils.java

注意

變更ssl_keystore_file_path取決於java憑證位置。 在 AKS 上的 HDInsight 上運行的 Apache Flink 叢集,其路徑為 /usr/lib/jvm/msopenjdk-11-jre/lib/security

package com.azure.cosmosdb.cassandra.util;

import com.datastax.driver.core.*;

import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;

/**
 * Cassandra utility class to handle the Cassandra Sessions
 */
public class CassandraUtils {

    private Cluster cluster;
    private Configurations config = new Configurations();
    private String cassandraHost = "<cassandra-host-ip>";
    private int cassandraPort = 10350;
    private String cassandraUsername = "localhost";
    private String cassandraPassword = "<cassandra-password>";
    private File sslKeyStoreFile = null;
    private String sslKeyStorePassword = "<keystore-password>";


    /**
     * This method creates a Cassandra Session based on the end-point details given in config.properties.
     * This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
     * If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
     * @return Session Cassandra Session
     */
    public Session getSession() {

        try {
            //Load cassandra endpoint details from config.properties
            loadCassandraConnectionDetails();

            final KeyStore keyStore = KeyStore.getInstance("JKS");
            try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
                keyStore.load(is, sslKeyStorePassword.toCharArray());
            }

            final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                    .getDefaultAlgorithm());
            kmf.init(keyStore, sslKeyStorePassword.toCharArray());
            final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
                    .getDefaultAlgorithm());
            tmf.init(keyStore);

            // Creates a socket factory for HttpsURLConnection using JKS contents.
            final SSLContext sc = SSLContext.getInstance("TLSv1.2");
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());

            JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
                    .withSSLContext(sc)
                    .build();
            cluster = Cluster.builder()
                    .addContactPoint(cassandraHost)
                    .withPort(cassandraPort)
                    .withCredentials(cassandraUsername, cassandraPassword)
                    .withSSL(sslOptions)
                    .build();

            return cluster.connect();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return null;
    }

    public Cluster getCluster() {
        return cluster;
    }

    /**
     * Closes the cluster and Cassandra session
     */
    public void close() {
        cluster.close();
    }

    /**
     * Loads Cassandra end-point details from config.properties.
     * @throws Exception
     */
    private void loadCassandraConnectionDetails() throws Exception {
        cassandraHost = config.getProperty("cassandra_host");
        cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
        cassandraUsername = config.getProperty("cassandra_username");
        cassandraPassword = config.getProperty("cassandra_password");
        String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
        String ssl_keystore_password = config.getProperty("ssl_keystore_password");

        // If ssl_keystore_file_path, build the path using JAVA_HOME directory.
        if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
            String javaHomeDirectory = System.getenv("JAVA_HOME");
            if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
                throw new Exception("JAVA_HOME not set");
            }
            ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
        }

        sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
                ssl_keystore_password : sslKeyStorePassword;

        sslKeyStoreFile = new File(ssl_keystore_file_path);

        if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
            throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
        }
    }
}

Configurations.java

package com.azure.cosmosdb.cassandra.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Configuration utility to read the configurations from properties file
 */
public class Configurations {
    private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
    private static String PROPERTY_FILE = "config.properties";
    private static Properties prop = null;

    private void loadProperties() throws IOException {
        InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
        if (input == null) {
            LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
            return;
        }
        prop = new Properties();
        prop.load(input);
    }

    public String getProperty(String propertyName) throws IOException {
        if (prop == null) {
            loadProperties();
        }
        return prop.getProperty(propertyName);

    }
}

範例資料夾

CassandraSink.java

package com.azure.cosmosdb.cassandra.examples;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;

public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {

            CassandraUtils utils = new CassandraUtils();
            Session cassandraSession = utils.getSession();
            try {
                UserRepository repository = new UserRepository(cassandraSession);

                //Insert rows into user table
                PreparedStatement preparedStatement = repository.prepareInsertStatement();
                repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);

                } finally {
                        if (null != utils) utils.close();
                        if (null != cassandraSession) cassandraSession.close();
                        }
            }
}

主要類別 :CassandraDemo.java

注意

  • 將 Kafka Broker IP 替換為您的 Kafka 叢集 broker IP
  • 準備主題
    • 使用者 /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
package com.azure.cosmosdb.cassandra.examples;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CassandraDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        // 1. read kafka message as stream input, update the broker IPs from your Kafka setup
        String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("user")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        kafka.print();

        DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
            String[] fields = line.split(",");
            int v1 = Integer.parseInt(fields[0]);
            Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
            return tuple3;
        }).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));


        dataStream.addSink(new CassandraSink());

        // 4. run stream
        env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
    }
}

建置專案

從 azure-cosmos-db-cassandra-java-getting-started-main 資料夾中執行 mvn clean install 以建置專案。 此命令會在目標資料夾下產生cosmosdb-cassandra-examples.jar。

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root     4096 May 15 03:54 ./
drwxr-xr-x 7 root root     4096 May 15 03:54 ../
drwxr-xr-x 2 root root     4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root     4096 May 15 03:54 classes/
-rw-r--r-- 1 root root    15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root     4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root     4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root     4096 May 15 03:54 maven-status/

將 jar 上傳至 Azure 記憶體,並將 wget 上傳至 webssh

msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar

準備 Cosmos DB 金鑰庫和資料表

在 /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples 中執行 UserProfile 類別,以建立 Azure Cosmos DB 的密鑰存放區和數據表。

bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar

將 Kafka 主題匯入至適用於 Apache Cassandra 的 Cosmos DB

執行 CassandraDemo 類別,將 Kafka 主題接收至適用於 Apache Cassandra 的 Cosmos DB。

bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar

顯示如何執行 Cassandra 示範的螢幕快照。

在 AKS 叢集的 HDInsight 中,檢查 Flink Web UI 的工作。

顯示如何在 AKS Flink UI 上檢查 HDInsight 作業的螢幕快照。

在 Kafka 中產生訊息

將訊息發布到 Kafka 主題。

sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random

user_set = [
        'John',
        'Mike',
        'Lucy',
        'Tom',
        'Machael',
        'Lily',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

city_set = [
        'Atmore',
        'Auburn',
        'Bessemer',
        'Birmingham',
        'Chickasaw',
        'Clanton',
        'Decatur',
        'Florence',
        'Greenville',
        'Jasper',
        'Huntsville',
        'Homer',
        'Homer'
]

def main():
        while True:
                unique_id = str(int(time.time()))
                if random.randrange(10) < 4:
                        city = random.choice(city_set[:3])
                else:
                        city = random.choice(city_set)
                user = random.choice(user_set)
                print(unique_id + "," + user + "," + city )
                time.sleep(1)

if __name__ == "__main__":
    main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516

查看 Azure 入口網站上 Cosmos DB 的 Apache Cassandra 資料表

Azure 入口網站上顯示適用於 Apache Cassandra 的 Cosmos DB 螢幕快照。

偏好