Condividi tramite


Operazioni DDL in Azure Cosmos DB for Apache Cassandra da Spark

SI APPLICA A: Cassandra

Questo articolo descrive in dettaglio le operazioni DDL su keyspace e tabelle in Azure Cosmos DB for Apache Cassandra da Spark.

Contesto Spark

Il connettore per l'API for Cassandra richiede che i dettagli della connessione a Cassandra vengano inizializzati nel contesto Spark. Quando si avvia un notebook, il contesto Spark è già inizializzato e non è consigliabile arrestarlo e reinizializzarlo. Una soluzione consiste nell'aggiungere la configurazione dell'istanza dell'API for Cassandra a un livello di cluster, nella configurazione del cluster Spark. Si tratta di un'attività una tantum per ogni cluster. Aggiungere il codice seguente alla configurazione Spark come coppia chiave-valore separata da spazio:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Nota

Se si usa Spark 3.x, non è necessario installare l'helper e la factory di connessione di Azure Cosmos DB. È anche consigliabile usare remoteConnectionsPerExecutor anziché connections_per_executor_max per il connettore Spark 3 (vedere sopra).

Avviso

Gli esempi di Spark 3 illustrati in questo articolo sono stati testati con Spark versione 3.2.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.

Operazioni DDL del keyspace

Creare un keyspace

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Convalidare in cqlsh

Dopo aver eseguito il comando seguente in cqlsh, viene visualizzato il keyspace creato in precedenza.

DESCRIBE keyspaces;

Eliminare un keyspace

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Convalidare in cqlsh

DESCRIBE keyspaces;

Operazioni DDL della tabella

Considerazioni:

  • È possibile assegnare la velocità effettiva a livello di tabella usando l'istruzione CREATE TABLE.
  • Una chiave di partizione può archiviare 20 GB di dati.
  • Un record può archiviare un massimo di 2 MB di dati.
  • Un intervallo di chiavi di partizione può archiviare più chiavi di partizione.

Crea una tabella

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Convalidare in cqlsh

Dopo aver eseguito il comando seguente in cqlsh, viene visualizzata una tabella denominata "books":

USE books_ks;
DESCRIBE books;

La velocità effettiva con provisioning e i valori di durata TTL predefiniti non vengono visualizzati nell'output del comando precedente. È possibile ottenere questi valori dal portale.

Modificare una tabella

È possibile modificare i valori seguenti usando il comando ALTER TABLE:

  • velocità effettiva con provisioning
  • valore durata TTL
    Le modifiche delle colonne non sono attualmente supportate.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Eliminare una tabella

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Convalidare in cqlsh

Dopo aver eseguito il comando seguente in cqlsh, la tabella "books" non è più disponibile:

USE books_ks;
DESCRIBE tables;

Passaggi successivi

Dopo aver creato il keyspace e la tabella, passare agli articoli seguenti per le operazioni CRUD e altro ancora: