Condividi tramite


Eseguire la migrazione ad Azure Istanza gestita per Apache Cassandra con Apache Spark

Laddove possibile, è consigliabile usare la replica nativa apache Cassandra per eseguire la migrazione dei dati dal cluster esistente ad Azure Istanza gestita per Apache Cassandra configurando un cluster ibrido. Questo approccio userà il protocollo gossip di Apache Cassandra per replicare i dati dal data center di origine nel nuovo data center dell'istanza gestita. Tuttavia, potrebbero verificarsi alcuni scenari in cui la versione del database di origine non è compatibile o non è possibile configurare un cluster ibrido.

Questa esercitazione descrive come eseguire la migrazione dei dati ad Azure Istanza gestita per Apache Cassandra in modo offline usando il connettore Cassandra Spark e Azure Databricks per Apache Spark.

Prerequisiti

Effettuare il provisioning di un cluster di Azure Databricks

È consigliabile selezionare Databricks runtime versione 7.5, che supporta Spark 3.0.

Screenshot che mostra la ricerca della versione del runtime di Databricks.

Aggiungere le dipendenze

Aggiungere la libreria del connettore Cassandra apache Spark al cluster per connettersi agli endpoint Cassandra nativi e di Azure Cosmos DB. Nel cluster selezionare Librerie>Installa nuova>Maven e quindi aggiungere com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 nelle coordinate Maven.

Screenshot che mostra la ricerca di pacchetti Maven in Databricks.

Selezionare Installa e quindi riavviare il cluster al termine dell'installazione.

Nota

Assicurarsi di riavviare il cluster Databricks dopo l'installazione della libreria del connettore Cassandra.

Creare un notebook Scala per la migrazione

Creare un notebook Scala in Databricks. Sostituire le configurazioni Cassandra di origine e di destinazione con le credenziali corrispondenti e i keyspace e le tabelle di origine e di destinazione. Eseguire quindi il codice seguente:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Nota

Se è necessario mantenere l'originale writetime di ogni riga, fare riferimento al campione migratorio cassandra.

Passaggi successivi