Compartir vía


Operaciones DDL en Azure Cosmos DB for Apache Cassandra de Spark

SE APLICA A: Cassandra

En este artículo se describe el espacio de claves y la tabla de operaciones DDL en Azure Cosmos DB for Apache Cassandra de Spark.

Contexto de Spark

El conector para la API de Cassandra requiere que los detalles de la conexión de Cassandra se inicialicen como parte del contexto de Spark. Cuando inicia un cuaderno, el contexto de Spark ya está inicializado y no es aconsejable pararlo y reiniciarlo. Una solución es agregar la configuración de la instancia de la API de Cassandra en un nivel de clúster, en la configuración de clúster de Spark. Esto se hace una sola vez por cada clúster. Agregue el código siguiente a la configuración de Spark como un par clave-valor separado por espacios:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Nota

Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor en lugar de connections_per_executor_max para el conector de Spark 3 (consulte más arriba).

Advertencia

Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.

Operaciones DDL de Keyspace

Crear un espacio de claves

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Validar en cqlsh

Ejecute el siguiente comando en cqlsh y verá el espacio de claves que creó anteriormente.

DESCRIBE keyspaces;

Colocar un espacio de claves

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Validar en cqlsh

DESCRIBE keyspaces;

Operaciones DDL de tabla

Consideraciones:

  • El rendimiento puede asignarse a nivel de tabla; para ello, use la instrucción create table.
  • Una clave de partición puede almacenar 20 GB de datos.
  • Un registro puede almacenar un máximo de 2 MB de datos.
  • Un intervalo de claves de partición puede almacenar varias claves de partición.

Creación de una tabla

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Validar en cqlsh

Ejecute el siguiente comando en cqlsh y verá la tabla denominada "books:"

USE books_ks;
DESCRIBE books;

El rendimiento previsto y los valores TTL predeterminados no se muestran en la salida del comando anterior, pero puede obtener estos valores en el portal.

Alter table

Puede modificar los siguientes valores mediante el comando alter table:

  • rendimiento aprovisionado
  • valor del período de vida
    Actualmente no se admiten los cambios en columnas.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Quitar tabla

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Validar en cqlsh

Ejecute el siguiente comando en cqlsh y verá que la tabla "books" ya no está disponible:

USE books_ks;
DESCRIBE tables;

Pasos siguientes

Después de crear el espacio de claves y la tabla, lea los siguientes artículos referentes a las operaciones CRUD y mucho más: