使用雙重寫入 Proxy 和 Apache Spark,將資料從 Apache Cassandra 即時移轉至 Azure Cosmos DB for Apache Cassandra
因為各種原因,Azure Cosmos DB 中的 API for Cassandra 已成為 Apache Cassandra 上所執行企業工作負載的絕佳選擇,例如:
沒有管理和監視額外負荷:可消除跨作業系統、JVM 和 yaml 檔案和其互動的無數種管理和監視設定額外負荷。
可大幅節省成本:您可以使用 Azure Cosmos DB 來節省成本,包括 VM、頻寬和任何適用授權的成本。 此外,您也不需要管理資料中心、伺服器、SSD 儲存體、網路和電力的成本。
可使用現有的程式碼和工具:Azure Cosmos DB 提供與現有 Cassandra SDK 和工具相容的有線通訊協定層級。 此相容性可確保您可以透過 Azure Cosmos DB for Apache Cassandra 使用現有程式碼基底執行瑣碎的變更。
Azure Cosmos DB 不支援以原生 Apache Cassandra Gossip 通訊協定進行複寫。 因此,若要以零停機時間的方式進行移轉,則需要不同的方法。 本教學課程描述如何使用雙重寫入 Proxy 和 Apache Spark,將資料從原生 Apache Cassandra 叢集即時移轉至 Azure Cosmos DB for Apache Cassandra。
下方為該模式的圖解。 雙重寫入 Proxy 用來捕捉即時變更,而 Apache Spark 則用來大量複製歷程記錄資料。 Proxy 可以接受來自應用程式程式碼的連線,而不需要變更任何設定。 其會將所有要求路由至您的來源資料庫,並在進行大量複製時,以非同步方式將寫入路由至 API for Cassandra。
必要條件
請檢閱 Azure Cosmos DB for Apache Cassandra 中支援的功能,以確保相容性。
請確保您的來源叢集與目標 API for Cassandra 端點之間有網路連線。
請確保您已將 keyspace/資料表配置從來源 Cassandra 資料庫移轉至目標 API for Cassandra 帳戶。
重要
如果您需要在移轉期間保留 Apache Cassandra
writetime
,則在建立資料表時必須設定下列旗標:with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
例如:
CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users ( name text, userID int, address text, phone int, PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
佈建 Spark 叢集
建議使用 Azure Databricks。 使用支援 Spark 3.0 或更高版本的執行階段。
重要
您必須確保您的 Azure Databricks 帳戶與來源 Apache Cassandra 叢集具有網路連線能力。 這可能會需要插入 VNet。 如需詳細資訊,請參閱這裡的文章。
新增 Spark 相依性
您需要將 Apache Spark Cassandra 連接器程式庫新增至您的叢集,以連線至原生和 Azure Cosmos DB Cassandra 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後在 Maven 座標中新增 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
。
重要
如果您需要在移轉期間針對每個資料列保留 Apache Cassandra writetime
,我們建議使用此範例。 此範例中的相依性 jar 也包含了 Spark 連接器,因此請留意您應該安裝的是此範例,而不是上述的連接器組件。 如果您想要在記錄資料載入完成後在來源與目標之間執行資料列比較驗證,此範例也能幫上忙。 如需詳細資訊,請參閱下方的「執行歷程記錄資料載入」和「驗證來源和目標」章節。
選取 [安裝],然後在安裝完成時重新啟動叢集。
注意
安裝 Cassandra 連接器程式庫之後,請務必重新啟動 Azure Databricks 叢集。
安裝雙重寫入 Proxy
為了在執行雙重寫入時能達到最佳效能,建議您在來源 Cassandra 叢集中的所有節點上安裝 Proxy。
#assuming you do not have git already installed
sudo apt-get install git
#assuming you do not have maven already installed
sudo apt install maven
#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git
#change directory
cd cassandra-proxy
#compile the proxy
mvn package
啟動雙重寫入 Proxy
建議您在來源 Cassandra 叢集中的所有節點上安裝 Proxy。 至少要執行下列命令,以在每個節點上啟動 Proxy。 將 <target-server>
取代為目標叢集中其中一個節點的 IP 或伺服器位址。 以本機 .jks 檔案的路徑取代 <path to JKS file>
,並以對應的密碼取代 <keystore password>
。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>
以這種方式啟動 Proxy 便代表假設符合了下列條件:
- 來源和目標端點具有相同的使用者名稱和密碼。
- 來源和目標端點使用的是安全通訊端層 (SSL)。
如果您的來源和目標端點無法符合這些條件,請繼續閱讀以取得進一步的設定選項。
設定 SSL
針對 SSL,您可以執行現有的金鑰儲存區 (例如,您的來源叢集所使用的金鑰儲存區) 或使用 keytool
建立自我簽署憑證:
keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048
如果來源或目標端點未使用 SSL,您也可以停用 SSL。 使用 --disable-source-tls
或 --disable-target-tls
旗標:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true --disable-target-tls true
注意
當您透過 Proxy 建立對資料庫的 SSL 連線時,請確定您的用戶端應用程式所使用的金鑰儲存區和密碼,與用於雙重寫入 Proxy 的相同。
設定認證和連接埠
根據預設,來源認證將會從您的用戶端應用程式傳遞。 Proxy 將使用認證來連線到來源和目標叢集。 如先前所述,此程式會假設來源的認證和目標的認證相同。 啟動 Proxy 時,必須分別為目標 API for Cassandra 端點指定不同的使用者名稱和密碼:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
若未特別指定,預設的來源和目標連接埠埠將會是 9042。 在此情況下,API for Cassandra 會在連接埠上執行 10350
,因此您需要使用 --source-port
或 --target-port
來指定連接埠號碼:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
遠端部署 Proxy
在某些情況下,您可能不想將 Proxy 安裝在叢集節點本身,而是想要將它安裝在不同的電腦上。 在這種情況下,您必須指定 <source-server>
的 IP 位址:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>
警告
在不同的電腦上遠端安裝和執行 Proxy (而不是在來源 Apache Cassandra 叢集中的所有節點上執行 Proxy) 會在進行即時移轉時影響到效能表現。 雖然在功能上確實能夠運作,但用戶端驅動程式無法開啟叢集內所有節點的連線,而且會依賴已安裝 Proxy 的單一協調員節點進行連線。
毫不允許應用程式程式碼變更
根據預設,Proxy 會在連接埠 29042 上接聽。 應用程式程式碼必須變更為指向此連接埠。 不過,您可以變更 Proxy 所接聽的連接埠。 你可以透過下列方式來排除應用程式層級的程式碼變更:
- 讓來源 Cassandra 伺服器在不同的連接埠上執行。
- 讓 Proxy 在標準 Cassandra 連接埠 9042 上執行。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042
注意
在叢集節點上安裝 Proxy 並不需要重新啟動節點。 但是如果您有許多應用程式用戶端,而且想要讓 Proxy 在標準 Cassandra 連接埠 9042 上執行以避免任何應用層級的程式碼變更,您需要變更 Apache Cassandra 預設連接埠。 您接著必須重新啟動叢集中的節點,並將來源連接埠設定為您為來源 Cassandra 叢集定義的新連接埠。
在下列範例中,我們將來源 Cassandra 叢集變更為在連接埠 3074 上執行,然後在連接埠 9042 上啟動叢集:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074
強制通訊協定
Proxy 具有強制通訊協定的功能,在來源端點比目標更先進或不受支援時將可能需要使用功能。 在這種情況下,您可以指定 --protocol-version
和 --cql-version
來強制通訊協定與目標相符:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11
在雙重寫入 Proxy 執行之後,您必須變更應用程式用戶端上的連接埠,然後重新啟動。 (如果您選擇該方法,您也能變更 Cassandra 連接埠,並重新啟動叢集。)Proxy 接著會開始將寫入轉接至目標端點。 您可以在此瞭解 Proxy 工具中可用的 監視和計量。
執行歷程資料載入
若要載入該資料,請在您的 Azure Databricks 帳戶中建立 Scala 筆記本。 將您的來源和目標 Cassandra 設定取代為對應的認證,以及來源和目標 keyspace 和資料表。 在下列範例中,視需要為每個資料表新增更多變數,然後執行。 在應用程式開始將要求傳送到雙重寫入 Proxy 之後,您即能開始轉移歷程記錄資料。
重要
在移轉資料之前,請將容器輸送量增加至所需數量,好讓您的應用程式可以快速地移轉。 在開始移轉之前,先調整輸送量,可協助您縮短移轉資料的時間。 為了防止在歷程記錄資料載入期間出現速率限制的情況,建議您在 API for Cassandra 中啟用伺服器端重試 (SSR)。 請在這裡參閱我們的文章以取得更多詳細資訊,以及關於如何啟用 SSR 的指示。
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "10350",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.option("writetime", timestamp)
.mode(SaveMode.Append)
.save
注意
在先前的 Scala 範例中,您會發現在讀取來源資料表中的所有資料之前,timestamp
將會設定為目前的時間。 然後,writetime
會設定為追溯的時間戳記。 這麼做能確保讀取歷程資料時,從歷程資料負載寫入至目標端點的紀錄,不會覆蓋掉帶有較晚時間戳記,來自雙重寫入 Proxy 的更新。
重要
如果您基於任何原因而需要保留「確切」的時間戳記,您應該採用歷程資料移轉方法來保留時間戳記,例如此範例。 範例中的相依性 jar 也包含了 Spark 連接器,因此您不需要安裝先前必要條件中所述的 Spark 連接器元件,因為在 Spark 叢集中同時安裝兩者將會造成衝突。
驗證來源和目標
在歷程資料載入完成後,您的資料庫應該已同步完成並準備好進行完全移轉。 不過,我們建議您先驗證來源和目標,以在完全移轉之前確保它們相符。
注意
如果您使用上述的Cassandra 移轉程式範例來保留 writetime
,這包括了藉由根據特定容錯度來比較來源和目標中的資料列,以驗證移轉的能力。