Udostępnij za pośrednictwem


Tworzenie/wstawianie danych do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark

DOTYCZY: Kasandra

W tym artykule opisano sposób wstawiania przykładowych danych do tabeli w usłudze Azure Cosmos DB for Apache Cassandra z platformy Spark.

Interfejs API dla konfiguracji bazy danych Cassandra

Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Uwaga

Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor zamiast connections_per_executor_max łącznika Spark 3 (zobacz powyżej).

Ostrzeżenie

Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.

Interfejs API ramki danych

Tworzenie ramki danych z przykładowymi danymi

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Uwaga

Funkcja "Utwórz, jeśli nie istnieje" na poziomie wiersza nie jest jeszcze obsługiwana.

Utrwalanie w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra

Podczas zapisywania danych można również ustawić ustawienia zasad czasu wygaśnięcia i spójności, jak pokazano w poniższym przykładzie:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Weryfikowanie w języku cqlsh

use books_ks;
select * from books;

Interfejs API odpornej rozproszonej bazy danych (RDD)

Tworzenie rdD z przykładowymi danymi

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Uwaga

Utwórz, jeśli nie istnieje funkcja nie jest jeszcze obsługiwana.

Utrwalanie w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra

Podczas zapisywania danych w interfejsie API dla bazy danych Cassandra można również ustawić ustawienia zasad czasu wygaśnięcia i spójności, jak pokazano w poniższym przykładzie:

import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Weryfikowanie w języku cqlsh

use books_ks;
select * from books;

Następne kroki

Po wstawieniu danych do tabeli usługi Azure Cosmos DB for Apache Cassandra przejdź do następujących artykułów, aby wykonać inne operacje na danych przechowywanych w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra: