共用方式為


使用 Azure Databricks 將資料從 Cassandra 移轉至 Azure Cosmos DB for Apache Cassandra 帳戶

適用於: Cassandra

因為數個原因,Azure Cosmos DB 中的 API for Cassandra 已成為在 Apache Cassandra 上執行企業工作負載的絕佳選擇:

  • 沒有管理和監視額外負荷:可消除跨作業系統、JVM 和 YAML 檔案和其互動的管理和監視設定額外負荷。

  • 大幅節省成本:您可以使用 Azure Cosmos DB 來節省成本,包括 VM、頻寬和任何適用授權的成本。 您不需要管理資料中心、伺服器、SSD 儲存體、網路和電力成本。

  • 可使用現有的程式碼和工具:Azure Cosmos DB 提供與現有 Cassandra SDK 和工具相容的有線通訊協定層級。 此相容性確保您可以透過 Azure Cosmos DB for Apache Cassandra 使用現有程式碼基底來執行瑣碎的變更。

有許多方法可以將資料庫工作負載從某個平台移轉至另一個平台。 Azure Databricks 是適用於 Apache Spark 的平台即服務 (PaaS) 供應項目,可提供方法來大規模執行離線移轉。 本文描述使用 Azure Databricks 將資料從原生 Apache Cassandra keyspace 和資料表移轉至 Azure Cosmos DB for Apache Cassandra 所需的步驟。

必要條件

佈建 Azure Databricks 叢集

您可以遵循指示來佈建 Azure Databricks 叢集。 建議您選取支援 Spark 3.0 的 Databricks Runtime 7.5 版本。

螢幕擷取畫面顯示尋找 Databricks 執行階段版本。

新增相依性

您需要將 Apache Spark Cassandra 連接器程式庫新增至您的叢集,以連線至原生和 Azure Cosmos DB Cassandra 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後在 Maven 座標中新增 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0

螢幕擷取畫面顯示搜尋 Databricks 中 Maven 套件。

選取 [安裝],然後在安裝完成時重新啟動叢集。

注意

安裝 Cassandra 連接器程式庫之後,請務必重新啟動 Databricks 叢集。

警告

本文所示的範例已與 Spark 3.0.1 版本和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 一同測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。

建立 Scala 筆記本以進行移轉

在 Databricks 中建立 Scala 筆記本。 將您的來源和目標 Cassandra 設定取代為對應的認證以及來源和目標 keyspace 和資料表。 然後執行下列程式碼:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

注意

spark.cassandra.output.batch.size.rowsspark.cassandra.output.concurrent.writes 值以及 Spark 叢集中的背景工作數目是為了避免速率限制而需要微調的重要設定。 Azure Cosmos DB 的要求超過佈建輸送量或要求單位 (RU) 時,就會發生速率限制。 根據 Spark 叢集中的執行程式數目,也可能根據寫入至目標資料表的每筆記錄大小 (因此根據 RU 成本),您可能需要調整這些設定。

疑難排解

速率限制 (429 錯誤)

即使您將設定減少為最小值,可能還是會看到 429 錯誤碼或「要求速率太大」錯誤文字。 下列案例可能會導致速率限制:

  • 配置給資料表的輸送量小於 6,000 個要求單位。 即使是最小值設定,Spark 還是可以用大約 6,000 (含) 個以上要求單位的速率進行寫入。 如果您已在具有共用輸送量的 keyspace 中佈建資料表,則此資料表可能會在執行階段具有 6,000 個以下的 RU。

    執行移轉時,請確定您要移轉至其中的資料表至少有 6,000 個 RU 可用。 如有必要,請將專用要求單位配置給該資料表。

  • 具有大量資料量的資料扭曲過多。 如果您有大量資料要移轉至指定的資料表,但在資料中有明顯的扭曲 (即針對相同的分割區索引鍵值寫入大量記錄),則即使您在資料表中佈建數個要求單位,仍然可能會遇到速率限制。 要求單位會平均分配至實體分割區,而大量資料扭曲可能會造成單一分割區要求的瓶頸。

    在此案例中,請減少為 Spark 中的最低輸送量設定,並強制緩慢地執行移轉。 如果您要移轉參考或控制資料表,但存取頻率較低且扭曲可能很高,則此案例可能較常見。 不過,如果任何其他類型的資料表中有明顯的扭曲,則建議您在穩定狀態期間檢閱您的資料模型,以避免工作負載的常用分割區問題。

下一步