Upsert-data till Azure Cosmos DB för Apache Cassandra från Spark
GÄLLER FÖR: Kassandra
Den här artikeln beskriver hur du uppgraderar data till Azure Cosmos DB för Apache Cassandra från Spark.
API för Cassandra-konfiguration
Ställ in spark-konfigurationen nedan i notebook-klustret. Det är en engångsaktivitet.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Kommentar
Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor
i stället connections_per_executor_max
för för Spark 3-anslutningsappen (se ovan).
Varning
Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.
Dataframe-API
Skapa en dataram
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book
val booksUpsertDF = Seq(
("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()
Upserta data
// Upsert is no different from create
booksUpsertDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.save()
Uppdatera data
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))
RDD-API
Kommentar
Upsert från RDD-API:et är samma som create-åtgärden
Nästa steg
Fortsätt till följande artiklar för att utföra andra åtgärder på data som lagras i Azure Cosmos DB för Apache Cassandra-tabeller: