Partilhar via


Migrar dados de Cassandra para uma conta do Azure Cosmos DB para Apache Cassandra usando o Azure Databricks

APLICA-SE A: Cassandra

A API para Cassandra no Azure Cosmos DB tornou-se uma ótima opção para cargas de trabalho corporativas em execução no Apache Cassandra por vários motivos:

  • Sem sobrecarga de gerenciamento e monitoramento: Ele elimina a sobrecarga de gerenciar e monitorar configurações em arquivos do sistema operacional, JVM e YAML e suas interações.

  • Economia de custos significativa: você pode economizar custos com o Azure Cosmos DB, que inclui o custo de VMs, largura de banda e quaisquer licenças aplicáveis. Você não precisa gerenciar datacenters, servidores, armazenamento SSD, rede e custos de eletricidade.

  • Capacidade de usar código e ferramentas existentes: o Azure Cosmos DB fornece compatibilidade no nível do protocolo wire com SDKs e ferramentas Cassandra existentes. Essa compatibilidade garante que você possa usar sua base de código existente com o Azure Cosmos DB para Apache Cassandra com alterações triviais.

Há muitas maneiras de migrar cargas de trabalho de banco de dados de uma plataforma para outra. O Azure Databricks é uma oferta de plataforma como serviço (PaaS) para o Apache Spark que oferece uma maneira de executar migrações offline em grande escala. Este artigo descreve as etapas necessárias para migrar dados de keyspaces e tabelas Apache Cassandra nativos para o Azure Cosmos DB para Apache Cassandra usando o Azure Databricks.

Pré-requisitos

Provisionar um cluster do Azure Databricks

Você pode seguir as instruções para provisionar um cluster do Azure Databricks. Recomendamos selecionar Databricks runtime version 7.5, que suporta o Spark 3.0.

Captura de tela que mostra a localização da versão de tempo de execução do Databricks.

Adicionar dependências

Você precisa adicionar a biblioteca Apache Spark Cassandra Connector ao cluster para se conectar aos pontos de extremidade Cassandra nativos e do Azure Cosmos DB. No cluster, selecione Bibliotecas>Instalar Novo>Maven e adicione com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 as coordenadas do Maven.

Captura de tela que mostra a busca por pacotes Maven no Databricks.

Selecione Instalar e reinicie o cluster quando a instalação estiver concluída.

Nota

Certifique-se de reiniciar o cluster Databricks após a biblioteca Cassandra Connector ter sido instalada.

Aviso

Os exemplos mostrados neste artigo foram testados com o Spark versão 3.0.1 e o correspondente Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Versões posteriores do Spark e/ou do conector Cassandra podem não funcionar como esperado.

Criar Bloco de Anotações Scala para migração

Crie um bloco de anotações Scala no Databricks. Substitua as configurações Cassandra de origem e de destino pelas credenciais correspondentes e pelos espaços-chave e tabelas de origem e destino. Em seguida, execute o seguinte código:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Nota

Os spark.cassandra.output.batch.size.rows valores e spark.cassandra.output.concurrent.writes e o número de trabalhadores no cluster do Spark são configurações importantes a serem ajustadas para evitar a limitação de taxa. A limitação de taxa acontece quando as solicitações ao Azure Cosmos DB excedem a taxa de transferência provisionada ou as unidades de solicitação (RUs). Talvez seja necessário ajustar essas configurações, dependendo do número de executores no cluster do Spark e, potencialmente, do tamanho (e, portanto, do custo da RU) de cada registro que está sendo gravado nas tabelas de destino.

Resolver problemas

Limitação da taxa (erro 429)

Poderá ver um código de erro 429 ou um texto de erro "a taxa de pedidos é grande", mesmo que tenha reduzido as definições aos seus valores mínimos. Os cenários a seguir podem causar limitação de taxa:

  • A taxa de transferência alocada para a tabela é inferior a 6.000 unidades de solicitação. Mesmo em configurações mínimas, o Spark pode gravar a uma taxa de cerca de 6.000 unidades de solicitação ou mais. Se você provisionou uma tabela em um espaço de chave com taxa de transferência compartilhada, é possível que essa tabela tenha menos de 6.000 RUs disponíveis em tempo de execução.

    Certifique-se de que a tabela para a qual você está migrando tenha pelo menos 6.000 RUs disponíveis quando você executar a migração. Se necessário, aloque unidades de solicitação dedicadas a essa tabela.

  • Inclinação excessiva de dados com grande volume de dados. Se você tiver uma grande quantidade de dados para migrar para uma determinada tabela, mas tiver uma distorção significativa nos dados (ou seja, um grande número de registros sendo gravados para o mesmo valor de chave de partição), ainda poderá experimentar limitação de taxa, mesmo se tiver várias unidades de solicitação provisionadas em sua tabela. As unidades de solicitação são divididas igualmente entre partições físicas, e a distorção de dados pesada pode causar um afunilamento de solicitações para uma única partição.

    Nesse cenário, reduza para configurações de taxa de transferência mínima no Spark e force a migração a ser executada lentamente. Esse cenário pode ser mais comum quando você está migrando tabelas de referência ou controle, onde o acesso é menos frequente e a distorção pode ser alta. No entanto, se uma distorção significativa estiver presente em qualquer outro tipo de tabela, convém revisar seu modelo de dados para evitar problemas de partição ativa para sua carga de trabalho durante operações de estado estacionário.

Próximos passos