Créer/insérer des données dans Azure Cosmos DB for Apache Cassandra à partir de Spark
S’APPLIQUE À : Cassandra
Cet article explique comment insérer des exemples de données dans une table dans Azure Cosmos DB for Apache Cassandra à partir de Spark.
API pour la configuration Cassandra
Définissez la configuration spark ci-dessous dans votre cluster de notebooks. Cette activité s’effectue une seule fois.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notes
Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor
plutôt que connections_per_executor_max
pour le connecteur Spark 3 (cf. ci-dessus).
Avertissement
Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.
API Dataframe
Créer un dataframe avec des exemples de données
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
Notes
La fonctionnalité « Créer si inexistant », au niveau des lignes, n’est pas encore prise en charge.
Persistance vers Azure Cosmos DB for Apache Cassandra
Lors de l’enregistrement de données, vous pouvez également définir des paramètres de durée de vie et de stratégie de cohérence, comme indiqué dans l’exemple suivant :
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Valider dans cqlsh
use books_ks;
select * from books;
API RDD (base de données distribuée résiliente)
Créer une base de données distribuée résiliente avec des exemples de données
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
Notes
La fonctionnalité « Créer si inexistant » n’est pas encore prise en charge.
Persistance vers Azure Cosmos DB for Apache Cassandra
Lors de l’enregistrement de données dans l’API pour Cassandra, vous pouvez également définir des paramètres de durée de vie et de stratégie de cohérence, comme indiqué dans l’exemple suivant :
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
Valider dans cqlsh
use books_ks;
select * from books;
Étapes suivantes
Après avoir inséré des données dans la table Azure Cosmos DB for Apache Cassandra, passez aux articles suivants pour effectuer d’autres opérations sur les données stockées dans Azure Cosmos DB for Apache Cassandra :