Creación e inserción de datos en Azure Cosmos DB for Apache Cassandra desde Spark
SE APLICA A: Cassandra
En este artículo se describe cómo insertar datos de ejemplo en una tabla de Azure Cosmos DB for Apache Cassandra desde Spark.
Configuración de la API para Cassandra
Establezca la configuración de Spark siguiente en el clúster del cuaderno. Es una actividad única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor
en lugar de connections_per_executor_max
para el conector de Spark 3 (consulte más arriba).
Advertencia
Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.
Dataframe API
Creación de una trama con datos de ejemplo
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
Nota:
Aún no se admite la funcionalidad "Crear si no existe" a nivel de registro.
Persista en Azure Cosmos DB for Apache Cassandra
Al guardar los datos, también puede establecer el período de vida y la configuración de la directiva de coherencia como se muestra en el siguiente ejemplo:
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Validar en cqlsh
use books_ks;
select * from books;
Resilient Distributed Database (RDD) API
Creación de una RDD con datos de ejemplo
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
Nota:
Aún no se admite la funcionalidad "Crear si no existe".
Persista en Azure Cosmos DB for Apache Cassandra
Al guardar los datos en la API para Cassandra, también puede establecer el período de vida y la configuración de la directiva de coherencia como se muestra en el siguiente ejemplo:
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
Validar en cqlsh
use books_ks;
select * from books;
Pasos siguientes
Tras insertar los datos en la tabla de Azure Cosmos DB for Apache Cassandra, continúe con los siguientes artículos para realizar otras operaciones en los datos almacenados en Azure Cosmos DB for Apache Cassandra:
- Lee operaciones.
- Upsert operations (Operaciones de upsert)
- Delete operations (Operaciones de eliminación)
- Aggregation operations (Operaciones de agregación)
- Table copy operations (Operaciones de copia en tabla)