Freigeben über


Migrieren von Daten aus Cassandra zu einem Azure Cosmos DB for Apache Cassandra-Konto mithilfe von Azure Databricks

GILT FÜR: Cassandra

API for Cassandra in Azure Cosmos DB hat sich aus mehreren Gründen zu einer guten Wahl für Unternehmensworkloads entwickelt, die unter Apache Cassandra ausgeführt werden:

  • Kein Mehraufwand für Verwaltung und Überwachung: Beseitigt den Mehraufwand der Verwaltung und Überwachung von Einstellungen für Betriebssysteme, JVM und YAML-Dateien und deren Interaktionen.

  • Erhebliche Kosteneinsparungen: Sie können Kosten sparen mit Azure Cosmos DB, einschließlich der Kosten für virtuelle Computer, Bandbreite und alle anwendbaren Lizenzen. Die Verwaltung von Kosten für Rechenzentren, Server, SSD-Speicher, Netzwerk und Strom entfallen.

  • Möglichkeit der Verwendung von vorhandenem Code und Tools: Azure Cosmos DB bietet Kompatibilität auf Verbindungsprotokollebene mit vorhandenen SDKs und Tools. Durch diese Kompatibilität ist sichergestellt, dass Sie die vorhandene Codebasis mit Azure Cosmos DB for Apache Cassandra mit nur geringfügigen Änderungen verwenden können.

Es gibt viele Möglichkeiten, Datenbankworkloads von einer Plattform zu einer anderen zu migrieren. Azure Databricks ist ein Platform as a Service-Angebot (PaaS) für Apache Spark, das Offlinemigrationsvorgänge in großem Umfang ermöglicht. In diesem Artikel werden die erforderlichen Schritte zum Migrieren von Daten aus nativen Apache Cassandra-Keyspaces und -Tabellen zu Azure Cosmos DB for Apache Cassandra mithilfe von Azure Databricks beschrieben.

Voraussetzungen

Bereitstellen eines Azure Databricks-Clusters

Nutzen Sie die Anweisungen zum Bereitstellen eines Azure Databricks-Clusters. Wir empfehlen die Auswahl der Version 7.5 der Databricks-Runtime, die Spark 3.0 unterstützt.

Screenshot, der die Suche nach der Databricks-Laufzeitversion zeigt.

Hinzufügen von Abhängigkeiten

Sie müssen dem Cluster die Apache Spark-Cassandra-Connectorbibliothek hinzufügen, um eine Verbindung mit nativen und Azure Cosmos DB-Cassandra-Endpunkten herzustellen. Wählen Sie in Ihrem Cluster Bibliotheken>Neue>Maveninstallieren und fügen Sie dann com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 in Maven-Koordinaten hinzu.

Screenshot, der zeigt, wie Maven-Pakete in Databricks gesucht werden.

Wählen Sie Installieren aus, und starten Sie den Cluster nach Abschluss der Installation neu.

Hinweis

Stellen Sie sicher, dass Sie den Databricks-Cluster neu starten, nachdem die Cassandra-Connectorbibliothek installiert wurde.

Warnung

Die in diesem Artikel gezeigten Beispiele wurden mit Spark Version 3.0.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.

Erstellen eines Scala-Notebooks für die Migration

Erstellen eines Scala Notebooks in Databricks. Ersetzen Sie Ihre Quell- und Zielkonfigurationen für Cassandra durch die entsprechenden Anmeldeinformationen und die Quell-und Zielkeyspaces und -tabellen. Führen Sie dann den folgenden Code aus:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Hinweis

Die spark.cassandra.output.batch.size.rows und spark.cassandra.output.concurrent.writes Werte und die Anzahl der Worker in Ihrem Spark-Cluster sind wichtige Konfigurationen, die optimiert werden müssen, um die Ratenbegrenzungzu vermeiden. Die Ratenbegrenzung erfolgt, wenn die Anforderungen an Azure Cosmos DB den bereitgestellten Durchsatz oder die Anforderungseinheiten (Request Units, RUS) überschreiten. Sie müssen diese Einstellungen möglicherweise abhängig von der Anzahl der Executors im Spark-Cluster und eventuell der Größe (und somit der RU-Kosten) jedes in die Zieltabellen geschriebenen Datensatzes anpassen.

Problembehandlung

Ratenbegrenzung (Fehler 429)

Möglicherweise wird der Fehlercode 429 oder "Anforderungsrate ist groß" angezeigt, auch wenn Sie die Einstellungen auf die Mindestwerte reduziert haben. Die folgenden Szenarien können die Ratenbegrenzung verursachen:

  • Der der Tabelle zugewiesene Durchsatz ist kleiner als 6,000 Anforderungseinheiten . Selbst bei minimalen Einstellungen kann Spark Schreibvorgänge mit einer Rate ab 6,000 Anforderungseinheiten ausführen. Wenn Sie eine Tabelle in einem Keyspace mit gemeinsamem Durchsatz bereitgestellt haben, ist es möglich, dass diese Tabelle zur Laufzeit weniger als 6,000 Anforderungseinheiten zur Verfügung hat.

    Stellen Sie sicher, dass die Tabelle, zu der migriert wird, mindestens 6.000 RUS verfügbar hat, wenn Sie die Migration ausführen. Weisen Sie der Tabelle ggf. dedizierte Anforderungseinheiten zu.

  • Übermäßige Datenschiefe bei großen Datenmengen. Wenn Sie sehr viele Daten in eine bestimmte Tabelle migrieren müssen, aber eine erhebliche Schiefe in den Daten vorliegt (d. h. eine große Anzahl von Datensätzen, die für denselben Partitionsschlüsselwert geschrieben werden), kann es trotzdem zu einer Ratenbegrenzung kommen, selbst wenn Sie mehrere Anforderungseinheiten in Ihrer Tabelle bereitgestellt haben. Anforderungseinheiten werden gleichmäßig auf die physischen Partitionen verteilt werden und eine starke Datenschiefe kann einen Engpass bei Anforderungen an eine einzelne Partition verursachen.

    In diesem Szenario reduzieren Sie die Durchsatzeinstellungen in Spark auf ein Minimum und erzwingen Sie eine langsame Ausführung der Migration. Dieses Szenario kann häufiger vorkommen, wenn Referenz- oder Steuertabellen migriert werden, bei denen der Zugriff weniger häufig erfolgt und die Datenschiefe hoch sein kann. Wenn jedoch eine erhebliche Schiefe in einem anderen Tabellentyp vorhanden ist, sollten Sie möglicherweise Ihr Datenmodell überprüfen, um Probleme mit heißen Partitionen für die Workload bei stabilen Vorgängen zu vermeiden.

Nächste Schritte