Operaciones DDL en Azure Cosmos DB for Apache Cassandra de Spark
SE APLICA A: Cassandra
En este artículo se describe el espacio de claves y la tabla de operaciones DDL en Azure Cosmos DB for Apache Cassandra de Spark.
Contexto de Spark
El conector para la API de Cassandra requiere que los detalles de la conexión de Cassandra se inicialicen como parte del contexto de Spark. Cuando inicia un cuaderno, el contexto de Spark ya está inicializado y no es aconsejable pararlo y reiniciarlo. Una solución es agregar la configuración de la instancia de la API de Cassandra en un nivel de clúster, en la configuración de clúster de Spark. Esto se hace una sola vez por cada clúster. Agregue el código siguiente a la configuración de Spark como un par clave-valor separado por espacios:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
API para la configuración relacionada con Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Nota
Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor
en lugar de connections_per_executor_max
para el conector de Spark 3 (consulte más arriba).
Advertencia
Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.
Operaciones DDL de Keyspace
Crear un espacio de claves
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Validar en cqlsh
Ejecute el siguiente comando en cqlsh y verá el espacio de claves que creó anteriormente.
DESCRIBE keyspaces;
Colocar un espacio de claves
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Validar en cqlsh
DESCRIBE keyspaces;
Operaciones DDL de tabla
Consideraciones:
- El rendimiento puede asignarse a nivel de tabla; para ello, use la instrucción create table.
- Una clave de partición puede almacenar 20 GB de datos.
- Un registro puede almacenar un máximo de 2 MB de datos.
- Un intervalo de claves de partición puede almacenar varias claves de partición.
Creación de una tabla
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Validar en cqlsh
Ejecute el siguiente comando en cqlsh y verá la tabla denominada "books:"
USE books_ks;
DESCRIBE books;
El rendimiento previsto y los valores TTL predeterminados no se muestran en la salida del comando anterior, pero puede obtener estos valores en el portal.
Alter table
Puede modificar los siguientes valores mediante el comando alter table:
- rendimiento aprovisionado
- valor del período de vida
Actualmente no se admiten los cambios en columnas.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Quitar tabla
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Validar en cqlsh
Ejecute el siguiente comando en cqlsh y verá que la tabla "books" ya no está disponible:
USE books_ks;
DESCRIBE tables;
Pasos siguientes
Después de crear el espacio de claves y la tabla, lea los siguientes artículos referentes a las operaciones CRUD y mucho más:
- Create/insert operations (Operaciones de creación e inserción)
- Lee operaciones.
- Upsert operations (Operaciones de upsert)
- Delete operations (Operaciones de eliminación)
- Aggregation operations (Operaciones de agregación)
- Table copy operations (Operaciones de copia en tabla)