Actualización e inserción de datos en Azure Cosmos DB for Apache Cassandra desde Spark
SE APLICA A: Cassandra
En este artículo se describe cómo realizar una operación de actualización e inserción de datos en Azure Cosmos DB for Apache Cassandra desde Spark.
Configuración de la API para Cassandra
Establezca la configuración de Spark siguiente en el clúster del cuaderno. Es una actividad única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor
en lugar de connections_per_executor_max
para el conector de Spark 3 (consulte más arriba).
Advertencia
Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.
Dataframe API
Creación de una base de datos
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book
val booksUpsertDF = Seq(
("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()
Actualización e inserción de datos
// Upsert is no different from create
booksUpsertDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.save()
Actualización de datos
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))
RDD API
Nota:
La operación de actualización e inserción desde la API de RDD es igual que la operación de creación.
Pasos siguientes
Continúe con los artículos siguientes para realizar otras operaciones en los datos almacenados en tablas de Azure Cosmos DB for Apache Cassandra:
- Delete operations (Operaciones de eliminación)
- Aggregation operations (Operaciones de agregación)
- Table copy operations (Operaciones de copia en tabla)