Migrieren von Daten aus Cassandra zu einem Azure Cosmos DB for Apache Cassandra-Konto mithilfe von Azure Databricks
GILT FÜR: Cassandra
API for Cassandra in Azure Cosmos DB hat sich aus mehreren Gründen zu einer guten Wahl für Unternehmensworkloads entwickelt, die unter Apache Cassandra ausgeführt werden:
Kein Mehraufwand für Verwaltung und Überwachung: Beseitigt den Mehraufwand der Verwaltung und Überwachung von Einstellungen für Betriebssysteme, JVM und YAML-Dateien und deren Interaktionen.
Erhebliche Kosteneinsparungen: Sie können Kosten sparen mit Azure Cosmos DB, einschließlich der Kosten für virtuelle Computer, Bandbreite und alle anwendbaren Lizenzen. Die Verwaltung von Kosten für Rechenzentren, Server, SSD-Speicher, Netzwerk und Strom entfallen.
Möglichkeit der Verwendung von vorhandenem Code und Tools: Azure Cosmos DB bietet Kompatibilität auf Verbindungsprotokollebene mit vorhandenen SDKs und Tools. Durch diese Kompatibilität ist sichergestellt, dass Sie die vorhandene Codebasis mit Azure Cosmos DB for Apache Cassandra mit nur geringfügigen Änderungen verwenden können.
Es gibt viele Möglichkeiten, Datenbankworkloads von einer Plattform zu einer anderen zu migrieren. Azure Databricks ist ein Platform as a Service-Angebot (PaaS) für Apache Spark, das Offlinemigrationsvorgänge in großem Umfang ermöglicht. In diesem Artikel werden die erforderlichen Schritte zum Migrieren von Daten aus nativen Apache Cassandra-Keyspaces und -Tabellen zu Azure Cosmos DB for Apache Cassandra mithilfe von Azure Databricks beschrieben.
Voraussetzungen
Stellen Sie ein Azure Cosmos DB for Apache Cassandra-Konto bereit.
Überprüfen Sie die in Azure Cosmos DB for Apache Cassandra unterstützten Features, um die Kompatibilität sicherzustellen.
Vergewissern Sie sich, dass im Zielkonto von Azure Cosmos DB for Apache Cassandra bereits leere Keyspaces und Tabellen erstellt wurden.
Bereitstellen eines Azure Databricks-Clusters
Nutzen Sie die Anweisungen zum Bereitstellen eines Azure Databricks-Clusters. Wir empfehlen die Auswahl der Version 7.5 der Databricks-Runtime, die Spark 3.0 unterstützt.
Hinzufügen von Abhängigkeiten
Sie müssen dem Cluster die Apache Spark-Cassandra-Connectorbibliothek hinzufügen, um eine Verbindung mit nativen und Azure Cosmos DB-Cassandra-Endpunkten herzustellen. Wählen Sie in Ihrem Cluster Bibliotheken>Neue>Maveninstallieren und fügen Sie dann com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
in Maven-Koordinaten hinzu.
Wählen Sie Installieren aus, und starten Sie den Cluster nach Abschluss der Installation neu.
Hinweis
Stellen Sie sicher, dass Sie den Databricks-Cluster neu starten, nachdem die Cassandra-Connectorbibliothek installiert wurde.
Warnung
Die in diesem Artikel gezeigten Beispiele wurden mit Spark Version 3.0.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Erstellen eines Scala-Notebooks für die Migration
Erstellen eines Scala Notebooks in Databricks. Ersetzen Sie Ihre Quell- und Zielkonfigurationen für Cassandra durch die entsprechenden Anmeldeinformationen und die Quell-und Zielkeyspaces und -tabellen. Führen Sie dann den folgenden Code aus:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val nativeCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "false",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val cosmosCassandra = Map(
"spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
"spark.cassandra.connection.port" -> "10350",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
//"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
"spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(nativeCassandra)
.load
//Write to CosmosCassandra
DFfromNativeCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(cosmosCassandra)
.mode(SaveMode.Append) // only required for Spark 3.x
.save
Hinweis
Die spark.cassandra.output.batch.size.rows
und spark.cassandra.output.concurrent.writes
Werte und die Anzahl der Worker in Ihrem Spark-Cluster sind wichtige Konfigurationen, die optimiert werden müssen, um die Ratenbegrenzungzu vermeiden. Die Ratenbegrenzung erfolgt, wenn die Anforderungen an Azure Cosmos DB den bereitgestellten Durchsatz oder die Anforderungseinheiten (Request Units, RUS) überschreiten. Sie müssen diese Einstellungen möglicherweise abhängig von der Anzahl der Executors im Spark-Cluster und eventuell der Größe (und somit der RU-Kosten) jedes in die Zieltabellen geschriebenen Datensatzes anpassen.
Problembehandlung
Ratenbegrenzung (Fehler 429)
Möglicherweise wird der Fehlercode 429 oder "Anforderungsrate ist groß" angezeigt, auch wenn Sie die Einstellungen auf die Mindestwerte reduziert haben. Die folgenden Szenarien können die Ratenbegrenzung verursachen:
Der der Tabelle zugewiesene Durchsatz ist kleiner als 6,000 Anforderungseinheiten . Selbst bei minimalen Einstellungen kann Spark Schreibvorgänge mit einer Rate ab 6,000 Anforderungseinheiten ausführen. Wenn Sie eine Tabelle in einem Keyspace mit gemeinsamem Durchsatz bereitgestellt haben, ist es möglich, dass diese Tabelle zur Laufzeit weniger als 6,000 Anforderungseinheiten zur Verfügung hat.
Stellen Sie sicher, dass die Tabelle, zu der migriert wird, mindestens 6.000 RUS verfügbar hat, wenn Sie die Migration ausführen. Weisen Sie der Tabelle ggf. dedizierte Anforderungseinheiten zu.
Übermäßige Datenschiefe bei großen Datenmengen. Wenn Sie sehr viele Daten in eine bestimmte Tabelle migrieren müssen, aber eine erhebliche Schiefe in den Daten vorliegt (d. h. eine große Anzahl von Datensätzen, die für denselben Partitionsschlüsselwert geschrieben werden), kann es trotzdem zu einer Ratenbegrenzung kommen, selbst wenn Sie mehrere Anforderungseinheiten in Ihrer Tabelle bereitgestellt haben. Anforderungseinheiten werden gleichmäßig auf die physischen Partitionen verteilt werden und eine starke Datenschiefe kann einen Engpass bei Anforderungen an eine einzelne Partition verursachen.
In diesem Szenario reduzieren Sie die Durchsatzeinstellungen in Spark auf ein Minimum und erzwingen Sie eine langsame Ausführung der Migration. Dieses Szenario kann häufiger vorkommen, wenn Referenz- oder Steuertabellen migriert werden, bei denen der Zugriff weniger häufig erfolgt und die Datenschiefe hoch sein kann. Wenn jedoch eine erhebliche Schiefe in einem anderen Tabellentyp vorhanden ist, sollten Sie möglicherweise Ihr Datenmodell überprüfen, um Probleme mit heißen Partitionen für die Workload bei stabilen Vorgängen zu vermeiden.