Compartilhar via


Operações de DDL no Azure Cosmos DB for Apache Cassandra por meio do Spark

APLICA-SE AO: Cassandra

Este artigo oferece detalhes sobre as operações de DDL de tabela e de keyspace no Azure Cosmos DB for Apache Cassandra por meio do Spark.

Contexto do Spark

O conector da API do Cassandra requer os detalhes de conexão do Cassandra para ser inicializado no contexto do Spark. Quando você inicia um notebook, o contexto do Spark já é inicializado e não é aconselhável parar e reinicializá-lo. Uma solução é adicionar a configuração de instância de API para Cassandra em um nível de cluster na configuração do cluster Spark. Trata-se de uma atividade única por cluster. Adicione o seguinte código para a configuração do Spark como par de valor de chave separado do espaço:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Observação

Se estiver usando o Spark 3.x, você não precisará instalar o auxiliar do Azure Cosmos DB nem o alocador de conexões. Você também deve usar remoteConnectionsPerExecutor em vez de connections_per_executor_max para o conector do Spark 3 (veja acima).

Aviso

Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e o Conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. As versões posteriores do Spark e/ou do conector do Cassandra podem não funcionar conforme o esperado.

Operações de DDL de keyspace

Criar um keyspace

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Validar no cqlsh

Execute o comando a seguir no cqlsh e você deverá ver o keyspace criado anteriormente.

DESCRIBE keyspaces;

Remover um keyspace

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Validar no cqlsh

DESCRIBE keyspaces;

Operações de DDL de tabela

Considerações:

  • A taxa de transferência pode ser atribuída no nível de tabela usando a instrução create table.
  • Uma chave de partição pode armazenar 20 GB de dados.
  • Um registro pode armazenar um máximo de 2 MB de dados.
  • Um intervalo de chaves de partição pode armazenar várias chaves de partição.

Criar uma tabela

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Validar no cqlsh

Execute o comando a seguir no cqlsh e você deverá ver a tabela chamada “books”:

USE books_ks;
DESCRIBE books;

Os valores de TTL padrão e a taxa de transferência provisionada não são mostrados na saída do comando anterior. Você pode obter esses valores no portal.

Alter table

Você pode alterar os valores a seguir usando o comando alter table:

  • taxa de transferência provisionada
  • valor de vida útil
    Não há suporte para alterações a colunas.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Remover tabela

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Validar no cqlsh

Execute o comando a seguir no cqlsh e você verá que a tabela "books" não está mais disponível:

USE books_ks;
DESCRIBE tables;

Próximas etapas

Depois de criar o keyspace e a tabela, prossiga para os artigos a seguir para operações de CRUD e muito mais: