Compartilhar via


Executar upsert no Azure Cosmos DB for Apache Cassandra usando o Spark

APLICA-SE AO: Cassandra

Este artigo descreve como executar upsert em dados no Azure Cosmos DB for Apache Cassandra usando o Spark.

Configuração da API do Cassandra

Defina a configuração do Spark abaixo no cluster do notebook. Trata-se de uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Observação

Se estiver usando o Spark 3.x, você não precisará instalar o auxiliar do Azure Cosmos DB nem o alocador de conexões. Você também deve usar remoteConnectionsPerExecutor em vez de connections_per_executor_max para o conector do Spark 3 (veja acima).

Aviso

Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e com o Conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Versões posteriores do Spark e/ou do conector do Cassandra podem não funcionar conforme o esperado.

API de Dataframe

Crie um dataframe

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book

val booksUpsertDF = Seq(
    ("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
    ("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
    ("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
    ("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
    ("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
    ("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()

Upsert data

// Upsert is no different from create
booksUpsertDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .save()

Atualizar dados

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))

API DE RDD

Observação

Upsert da API RDD é o mesmo que a operação de criação

Próximas etapas

Continue com os seguintes artigos para executar outras operações nos dados armazenados nas tabelas do Azure Cosmos DB for Apache Cassandra: