Partilhar via


Atualize dados para o Azure Cosmos DB para Apache Cassandra a partir do Spark

APLICA-SE A: Cassandra

Este artigo descreve como atualizar dados para o Azure Cosmos DB para Apache Cassandra a partir do Spark.

Configuração da API para Cassandra

Defina abaixo a configuração de faísca no cluster de blocos de anotações. É uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Nota

Se você estiver usando o Spark 3.x, não precisará instalar o auxiliar e a fábrica de conexões do Azure Cosmos DB. Você também deve usar remoteConnectionsPerExecutor em vez do connections_per_executor_max conector Spark 3 (veja acima).

Aviso

Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e o correspondente Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Versões posteriores do Spark e/ou do conector Cassandra podem não funcionar como esperado.

API DataFrame

Criar um quadro de dados

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book

val booksUpsertDF = Seq(
    ("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
    ("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
    ("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
    ("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
    ("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
    ("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()

Fazer upsert de dados

// Upsert is no different from create
booksUpsertDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .save()

Atualizar dados

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))

RDD API

Nota

Upsert da API RDD é o mesmo que a operação create

Próximos passos

Prossiga para os seguintes artigos para executar outras operações nos dados armazenados nas tabelas do Azure Cosmos DB para Apache Cassandra: