Freigeben über


Erstellen oder Einfügen von Daten in Azure Cosmos DB for Apache Cassandra in Spark

GILT FÜR: Cassandra

Dieser Artikel erläutert, wie Sie in Spark Beispieldaten in eine Tabelle in Azure Cosmos DB for Apache Cassandra einfügen.

API für Cassandra-Konfiguration

Legen Sie in Ihrem Notebookcluster die folgende Spark-Konfiguration fest. Dieser Schritt muss nur einmal ausgeführt werden.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Hinweis

Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor anstelle von connections_per_executor_max für den Spark 3-Connector verwenden (siehe oben).

Warnung

Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.

Datenrahmen-API

Erstellen eines Datenrahmens mit Beispieldaten

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Hinweis

Die Funktion „Erstellen, falls nicht vorhanden“ wird auf Zeilenebene noch nicht unterstützt.

Persistentes Speichern von Daten in Azure Cosmos DB for Apache Cassandra

Beim Speichern von Daten können Sie auch Einstellungen für die Gültigkeitsdauer und die Konsistenzrichtlinien festlegen, wie im folgenden Beispiel gezeigt:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Überprüfen in cqlsh

use books_ks;
select * from books;

RDD-API (Resilient Distributed Dataset)

Erstellen eines RDD mit Beispieldaten

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Hinweis

Die Funktion „Erstellen, falls nicht vorhanden“ wird noch nicht unterstützt.

Persistentes Speichern von Daten in Azure Cosmos DB for Apache Cassandra

Beim Speichern von Daten in der API für Cassandra können Sie auch Einstellungen für Gültigkeitsdauer und Konsistenzrichtlinien festlegen, wie im folgenden Beispiel gezeigt:

import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Überprüfen in cqlsh

use books_ks;
select * from books;

Nächste Schritte

Nachdem Sie Daten in die Azure Cosmos DB for Apache Cassandra-Tabelle eingefügt haben, lesen Sie die folgenden Artikel, um andere Vorgänge auf die in Azure Cosmos DB for Apache Cassandra gespeicherten Daten anzuwenden: