Operazioni DDL in Azure Cosmos DB for Apache Cassandra da Spark
SI APPLICA A: Cassandra
Questo articolo descrive in dettaglio le operazioni DDL su keyspace e tabelle in Azure Cosmos DB for Apache Cassandra da Spark.
Contesto Spark
Il connettore per l'API for Cassandra richiede che i dettagli della connessione a Cassandra vengano inizializzati nel contesto Spark. Quando si avvia un notebook, il contesto Spark è già inizializzato e non è consigliabile arrestarlo e reinizializzarlo. Una soluzione consiste nell'aggiungere la configurazione dell'istanza dell'API for Cassandra a un livello di cluster, nella configurazione del cluster Spark. Si tratta di un'attività una tantum per ogni cluster. Aggiungere il codice seguente alla configurazione Spark come coppia chiave-valore separata da spazio:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Configurazione relativa all'API for Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Nota
Se si usa Spark 3.x, non è necessario installare l'helper e la factory di connessione di Azure Cosmos DB. È anche consigliabile usare remoteConnectionsPerExecutor
anziché connections_per_executor_max
per il connettore Spark 3 (vedere sopra).
Avviso
Gli esempi di Spark 3 illustrati in questo articolo sono stati testati con Spark versione 3.2.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.
Operazioni DDL del keyspace
Creare un keyspace
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Convalidare in cqlsh
Dopo aver eseguito il comando seguente in cqlsh, viene visualizzato il keyspace creato in precedenza.
DESCRIBE keyspaces;
Eliminare un keyspace
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Convalidare in cqlsh
DESCRIBE keyspaces;
Operazioni DDL della tabella
Considerazioni:
- È possibile assegnare la velocità effettiva a livello di tabella usando l'istruzione CREATE TABLE.
- Una chiave di partizione può archiviare 20 GB di dati.
- Un record può archiviare un massimo di 2 MB di dati.
- Un intervallo di chiavi di partizione può archiviare più chiavi di partizione.
Crea una tabella
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Convalidare in cqlsh
Dopo aver eseguito il comando seguente in cqlsh, viene visualizzata una tabella denominata "books":
USE books_ks;
DESCRIBE books;
La velocità effettiva con provisioning e i valori di durata TTL predefiniti non vengono visualizzati nell'output del comando precedente. È possibile ottenere questi valori dal portale.
Modificare una tabella
È possibile modificare i valori seguenti usando il comando ALTER TABLE:
- velocità effettiva con provisioning
- valore durata TTL
Le modifiche delle colonne non sono attualmente supportate.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Eliminare una tabella
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Convalidare in cqlsh
Dopo aver eseguito il comando seguente in cqlsh, la tabella "books" non è più disponibile:
USE books_ks;
DESCRIBE tables;
Passaggi successivi
Dopo aver creato il keyspace e la tabella, passare agli articoli seguenti per le operazioni CRUD e altro ancora: