Operacje kopiowania tabel w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark
DOTYCZY: Kasandra
W tym artykule opisano sposób kopiowania danych między tabelami w usłudze Azure Cosmos DB for Apache Cassandra z platformy Spark. Polecenia opisane w tym artykule mogą również służyć do kopiowania danych z tabel apache Cassandra do usługi Azure Cosmos DB dla tabel Apache Cassandra.
Interfejs API dla konfiguracji bazy danych Cassandra
Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Uwaga
Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor
zamiast connections_per_executor_max
łącznika Spark 3 (zobacz powyżej).
Ostrzeżenie
Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.
Wstawianie przykładowych danych
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Kopiowanie danych między tabelami
Kopiowanie danych między tabelami (tabela docelowa istnieje)
//1) Create destination table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books_copy(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
//2) Read from one table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//3) Save to destination table
readBooksDF.write
.cassandraFormat("books_copy", "books_ks", "")
.save()
//4) Validate copy to destination table
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_copy", "keyspace" -> "books_ks"))
.load
.show
Kopiowanie danych między tabelami (tabela docelowa nie istnieje)
import com.datastax.spark.connector._
//1) Read from source table
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
//2) Creates an empty table in the keyspace based off of source table
val newBooksDF = readBooksDF
newBooksDF.createCassandraTable(
"books_ks",
"books_new",
partitionKeyColumns = Some(Seq("book_id"))
//clusteringKeyColumns = Some(Seq("some column"))
)
//3) Saves the data from the source table into the newly created table
newBooksDF.write
.cassandraFormat("books_new", "books_ks","")
.mode(SaveMode.Append)
.save()
//4) Validate table creation and data load
sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books_new", "keyspace" -> "books_ks"))
.load
.show
Dane wyjściowe —
+-------+------------------+--------------------+----------+-------------+
|book_id| book_author| book_name|book_price|book_pub_year|
+-------+------------------+--------------------+----------+-------------+
| b00300|Arthur Conan Doyle|The hounds of Bas...| 12.25| 1901|
| b00001|Arthur Conan Doyle| A study in scarlet| 11.33| 1887|
| b00023|Arthur Conan Doyle| A sign of four| 22.45| 1890|
| b00501|Arthur Conan Doyle|The memoirs of Sh...| 14.22| 1893|
| b01001|Arthur Conan Doyle|The adventures of...| 19.83| 1892|
+-------+------------------+--------------------+----------+-------------+
import com.datastax.spark.connector._
readBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
newBooksDF: org.apache.spark.sql.DataFrame = [book_id: string, book_author: string ... 3 more fields]
Następne kroki
- Rozpocznij pracę z tworzeniem interfejsu API dla konta, bazy danych i tabeli Cassandra przy użyciu aplikacji Java.
- Załaduj przykładowe dane do interfejsu API dla tabeli Cassandra przy użyciu aplikacji Java.
- Wykonywanie zapytań o dane z interfejsu API dla konta Cassandra przy użyciu aplikacji Java.