Udostępnij za pośrednictwem


Migrowanie do wystąpienia zarządzanego platformy Azure dla systemu Apache Cassandra przy użyciu platformy Apache Spark

Jeśli to możliwe, zalecamy użycie natywnej replikacji apache Cassandra w celu przeprowadzenia migracji danych z istniejącego klastra do wystąpienia zarządzanego platformy Azure dla usługi Apache Cassandra przez skonfigurowanie klastra hybrydowego. Takie podejście będzie używać protokołu plotek platformy Apache Cassandra do replikowania danych ze źródłowego centrum danych do nowego centrum danych wystąpienia zarządzanego. Mogą jednak wystąpić pewne scenariusze, w których źródłowa wersja bazy danych nie jest zgodna lub konfiguracja klastra hybrydowego nie jest możliwa.

W tym samouczku opisano sposób migrowania danych do usługi Migrate to Azure Managed Instance for Apache Cassandra w trybie offline przy użyciu łącznika Cassandra Spark i usługi Azure Databricks dla platformy Apache Spark.

Wymagania wstępne

Aprowizuj klaster usługi Azure Databricks

Zalecamy wybranie środowiska Uruchomieniowego usługi Databricks w wersji 7.5, która obsługuje platformę Spark 3.0.

Zrzut ekranu przedstawiający znajdowanie wersji środowiska uruchomieniowego usługi Databricks.

Dodawanie zależności

Dodaj bibliotekę łącznika Apache Spark Cassandra do klastra, aby nawiązać połączenie z punktami końcowymi natywnymi i punktami końcowymi cassandra usługi Azure Cosmos DB. W klastrze wybierz pozycję Biblioteki>Zainstaluj nowe>narzędzie Maven, a następnie dodaj com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 współrzędne narzędzia Maven.

Zrzut ekranu przedstawiający wyszukiwanie pakietów Maven w usłudze Databricks.

Wybierz pozycję Zainstaluj, a następnie uruchom ponownie klaster po zakończeniu instalacji.

Uwaga

Upewnij się, że klaster usługi Databricks został uruchomiony ponownie po zainstalowaniu biblioteki łącznika Cassandra.

Tworzenie notesu Scala na potrzeby migracji

Tworzenie notesu Scala w usłudze Databricks. Zastąp źródłowe i docelowe konfiguracje bazy danych Cassandra odpowiednimi poświadczeniami oraz źródłowymi i docelowymi przestrzeniami kluczy i tabelami. Następnie uruchom następujący kod:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Uwaga

Jeśli musisz zachować oryginał writetime każdego wiersza, zapoznaj się z przykładem migracji cassandra.

Następne kroki