Tabellenkopiervorgänge in Azure Cosmos DB for Apache Cassandra in Spark
GILT FÜR: Cassandra
In diesem Artikel erfahren Sie, wie Sie in Spark Daten zwischen Tabellen in Azure Cosmos DB for Apache Cassandra kopieren können. Die in diesem Artikel beschriebenen Befehle können auch verwendet werden, um Daten aus Apache Cassandra-Tabellen in Azure Cosmos DB for Apache Cassandra-Tabellen zu kopieren.
API für Cassandra-Konfiguration
Legen Sie in Ihrem Notebookcluster die folgende Spark-Konfiguration fest. Dieser Schritt muss nur einmal ausgeführt werden.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Hinweis
Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor
anstelle von connections_per_executor_max
für den Spark 3-Connector verwenden (siehe oben).
Warnung
Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Einfügen von Beispieldaten
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Kopieren von Daten zwischen Tabellen
Kopieren von Daten zwischen Tabellen (Zieltabelle ist vorhanden)
//1) Create destination table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books_copy(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
//2) Read from one table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//3) Save to destination table
readBooksDF.write
.cassandraFormat("books_copy", "books_ks", "")
.save()
//4) Validate copy to destination table
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_copy", "keyspace" -> "books_ks"))
.load
.show
Kopieren von Daten zwischen Tabellen (Zieltabelle ist nicht vorhanden)
import com.datastax.spark.connector._
//1) Read from source table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//2) Creates an empty table in the keyspace based off of source table
val newBooksDF = readBooksDF
newBooksDF.createCassandraTable(
"books_ks",
"books_new",
partitionKeyColumns = Some(Seq("book_id"))
//clusteringKeyColumns = Some(Seq("some column"))
)
//3) Saves the data from the source table into the newly created table
newBooksDF.write
.cassandraFormat("books_new", "books_ks","")
.mode(SaveMode.Append)
.save()
//4) Validate table creation and data load
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_new", "keyspace" -> "books_ks"))
.load
.show
Die Ausgabe sieht wie folgt aus:
+-------+------------------+--------------------+----------+-------------+
|book_id| book_author| book_name|book_price|book_pub_year|
+-------+------------------+--------------------+----------+-------------+
| b00300|Arthur Conan Doyle|The hounds of Bas...| 12.25| 1901|
| b00001|Arthur Conan Doyle| A study in scarlet| 11.33| 1887|
| b00023|Arthur Conan Doyle| A sign of four| 22.45| 1890|
| b00501|Arthur Conan Doyle|The memoirs of Sh...| 14.22| 1893|
| b01001|Arthur Conan Doyle|The adventures of...| 19.83| 1892|
+-------+------------------+--------------------+----------+-------------+
import com.datastax.spark.connector._
readBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
newBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
Nächste Schritte
- Erste Schritte mit dem Erstellen eines API für Cassandra-Kontos, einer Datenbank und einer Tabelle mithilfe einer Java-Anwendung
- Laden von Beispieldaten in eine API für Cassandra-Tabelle mithilfe einer Java-Anwendung
- Abfragen von Daten aus dem API für Cassandra-Konto mithilfe einer Java-Anwendung