Создание и вставка данных в Azure Cosmos DB для Apache Cassandra из Spark
Область применения: Кассандра
В этой статье описывается, как вставить примеры данных в таблицу в Azure Cosmos DB для Apache Cassandra из Spark.
API для конфигурации Cassandra
Задайте следующую конфигурацию Spark в кластере записных книжек. Это разовое действие.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Примечание.
Если вы используете Spark 3.x, вам не нужно устанавливать вспомогательный сервер Azure Cosmos DB и фабрику подключений. Также необходимо использовать remoteConnectionsPerExecutor
вместо connections_per_executor_max
для соединителя Spark 3 (см. выше).
Предупреждение
Примеры для Spark 3 в этой статье протестированы с использованием Spark версии 3.2.1 и соответствующего соединителя Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Более поздние версии Spark и (или) соединителя Cassandra могут работать непредсказуемым образом.
API Dataframe
Создание кадра данных с образцом данных
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
Примечание.
Функциональная возможность "Создать, если не существует" на уровне строк пока не поддерживается.
Сохранение в Azure Cosmos DB для Apache Cassandra
При сохранении данных, можно также задать время существования и параметры политики согласованности, как показано в следующем примере.
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Проверка в cqlsh
use books_ks;
select * from books;
API устойчивой распределенной базы данных (RDD)
Создание базы данных с образцами данных
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
Примечание.
Функциональная возможность "Создать, если не существует" пока не поддерживается.
Сохранение в Azure Cosmos DB для Apache Cassandra
При сохранении данных в API для Cassandra можно также задать параметры политики согласованности во времени и согласованности, как показано в следующем примере:
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
Проверка в cqlsh
use books_ks;
select * from books;
Следующие шаги
После вставки данных в таблицу Azure Cosmos DB для Apache Cassandra перейдите к следующим статьям, чтобы выполнить другие операции с данными, хранящимися в Azure Cosmos DB для Apache Cassandra: