Delen via


Migreren naar Azure Managed Instance voor Apache Cassandra met behulp van Apache Spark

Waar mogelijk raden we u aan om systeemeigen apache Cassandra-replicatie te gebruiken om gegevens van uw bestaande cluster te migreren naar Azure Managed Instance voor Apache Cassandra door een hybride cluster te configureren. Deze benadering maakt gebruik van het gossip-protocol van Apache Cassandra om gegevens van uw brondatacentrum te repliceren naar uw nieuwe datacenter van het beheerde exemplaar. Er kunnen echter scenario's zijn waarin de versie van uw brondatabase niet compatibel is of een hybride clusterinstallatie anders niet haalbaar is.

In deze zelfstudie wordt beschreven hoe u gegevens migreert naar Azure Managed Instance voor Apache Cassandra op een offline manier met behulp van de Cassandra Spark-connector en Azure Databricks voor Apache Spark.

Vereisten

Een Azure Databricks-cluster inrichten

U wordt aangeraden Databricks Runtime versie 7.5 te selecteren, die ondersteuning biedt voor Spark 3.0.

Schermopname van het vinden van de Databricks Runtime-versie.

Afhankelijkheden toevoegen

Voeg de Apache Spark Cassandra Connector-bibliotheek toe aan uw cluster om verbinding te maken met zowel systeemeigen als Azure Cosmos DB Cassandra-eindpunten. Selecteer In uw cluster Bibliotheken>installeren nieuwe>Maven en voeg vervolgens Maven-coördinaten toe.com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0

Schermopname van het zoeken naar Maven-pakketten in Databricks.

Selecteer Installeren en start het cluster opnieuw wanneer de installatie is voltooid.

Notitie

Zorg ervoor dat u het Databricks-cluster opnieuw start nadat de Cassandra Connector-bibliotheek is geïnstalleerd.

Scala Notebook maken voor migratie

Maak een Scala Notebook in Databricks. Vervang uw bron- en doelconfiguraties van Cassandra door de bijbehorende referenties en de bron- en doelsleutelruimten en tabellen. Voer vervolgens de volgende code uit:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Notitie

Als u het origineel writetime van elke rij moet behouden, raadpleegt u het cassandra-migratievoorbeeld .

Volgende stappen