Поделиться через


Upsert data в Azure Cosmos DB для Apache Cassandra из Spark

Область применения: Кассандра

В этой статье описывается, как добавлять данные в Azure Cosmos DB для Apache Cassandra из Spark.

API для конфигурации Cassandra

Задайте следующую конфигурацию Spark в кластере записных книжек. Это разовое действие.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Примечание.

Если вы используете Spark 3.x, вам не нужно устанавливать вспомогательный сервер Azure Cosmos DB и фабрику подключений. Также необходимо использовать remoteConnectionsPerExecutor вместо connections_per_executor_max для соединителя Spark 3 (см. выше).

Предупреждение

Примеры для Spark 3 в этой статье протестированы с использованием Spark версии 3.2.1 и соответствующего соединителя Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Более поздние версии Spark и (или) соединителя Cassandra могут работать непредсказуемым образом.

API Dataframe

Создание кадра данных

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// (1) Update: Changing author name to include prefix of "Sir"
// (2) Insert: adding a new book

val booksUpsertDF = Seq(
    ("b00001", "Sir Arthur Conan Doyle", "A study in scarlet", 1887),
    ("b00023", "Sir Arthur Conan Doyle", "A sign of four", 1890),
    ("b01001", "Sir Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
    ("b00501", "Sir Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
    ("b00300", "Sir Arthur Conan Doyle", "The hounds of Baskerville", 1901),
    ("b09999", "Sir Arthur Conan Doyle", "The return of Sherlock Holmes", 1905)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year")
booksUpsertDF.show()

Операция upsert с данными

// Upsert is no different from create
booksUpsertDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .save()

Обновление данных

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

//This runs on the driver, leverage only for one off updates
cdbConnector.withSessionDo(session => session.execute("update books_ks.books set book_price=99.33 where book_id ='b00300' and book_pub_year = 1901;"))

API RRD

Примечание.

Операция upsert из API RRD аналогична операции создания

Следующие шаги

Перейдите к следующим статьям, чтобы выполнить другие операции с данными, хранящимися в Azure Cosmos DB для таблиц Apache Cassandra: