Eseguire l'upsert dei dati in Azure Cosmos DB for Apache Cassandra da Spark
SI APPLICA A: Cassandra
Questo articolo descrive come eseguire l'upsert dei dati in Azure Cosmos DB for Apache Cassandra da Spark.
Configurazione dell'API for Cassandra
Impostare la configurazione Spark seguente nel cluster del notebook. Si tratta di un'attività una tantum.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Se si usa Spark 3.x, non è necessario installare l'helper e la factory di connessione di Azure Cosmos DB. È anche consigliabile usare remoteConnectionsPerExecutor
anziché connections_per_executor_max
per il connettore Spark 3 (vedere sopra).
Avviso
I campioni di Spark 3 illustrati in questo articolo sono stati testati con Spark versione 3.2.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.
API dataframe
Crea un dataframe
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book
val booksUpsertDF = Seq(
("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()
Eseguire l'upsert dei dati
// Upsert is no different from create
booksUpsertDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.save()
Aggiornamento dei dati
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))
API RDD
Nota
L'upsert dall'API RDD è uguale all'operazione di creazione
Passaggi successivi
Passare agli articoli seguenti per eseguire altre operazioni sui dati archiviati nelle tabelle Azure Cosmos DB for Apache Cassandra: