แก้ไข

แชร์ผ่าน


DDL operations in Azure Cosmos DB for Apache Cassandra from Spark

APPLIES TO: Cassandra

This article details keyspace and table DDL operations against Azure Cosmos DB for Apache Cassandra from Spark.

Spark context

The connector for API for Cassandra requires the Cassandra connection details to be initialized as part of the spark context. When you launch a notebook, the spark context is already initialized, and it isn't advisable to stop and reinitialize it. One solution is to add the API for Cassandra instance configuration at a cluster level, in the cluster spark configuration. It's one-time activity per cluster. Add the following code to the Spark configuration as a space separated key value pair:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Note

If you are using Spark 3.x, you do not need to install the Azure Cosmos DB helper and connection factory. You should also use remoteConnectionsPerExecutor instead of connections_per_executor_max for the Spark 3 connector (see above).

Warning

The Spark 3 samples shown in this article have been tested with Spark version 3.2.1 and the corresponding Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Later versions of Spark and/or the Cassandra connector may not function as expected.

Keyspace DDL operations

Create a keyspace

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Validate in cqlsh

Run the following command in cqlsh and you should see the keyspace you created earlier.

DESCRIBE keyspaces;

Drop a keyspace

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Validate in cqlsh

DESCRIBE keyspaces;

Table DDL operations

Considerations:

  • Throughput can be assigned at the table level by using the create table statement.
  • One partition key can store 20 GB of data.
  • One record can store a maximum of 2 MB of data.
  • One partition key range can store multiple partition keys.

Create a table

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Validate in cqlsh

Run the following command in cqlsh and you should see the table named “books:

USE books_ks;
DESCRIBE books;

Provisioned throughput and default TTL values aren't shown in the output of the previous command, you can get these values from the portal.

Alter table

You can alter the following values by using the alter table command:

  • provisioned throughput
  • time-to-live value
    Column changes are currently not supported.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Drop table

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Validate in cqlsh

Run the following command in cqlsh and you should see that the “books” table is no longer available:

USE books_ks;
DESCRIBE tables;

Next steps

After creating the keyspace and the table, proceed to the following articles for CRUD operations and more: