Ler dados de tabelas do Azure Cosmos DB for Apache Cassandra usando Spark
APLICA-SE AO: Cassandra
Este artigo descreve como ler dados armazenados no Azure Cosmos DB for Apache Cassandra do Spark.
Configuração da API do Cassandra
Defina a configuração do Spark abaixo no cluster do notebook. Trata-se de uma atividade única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Observação
Se estiver usando o Spark 3.x, você não precisará instalar o auxiliar do Azure Cosmos DB nem o alocador de conexões. Você também deve usar remoteConnectionsPerExecutor
em vez de connections_per_executor_max
para o conector do Spark 3 (veja acima).
Aviso
Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e com o Conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Versões posteriores do Spark e/ou do conector do Cassandra podem não funcionar conforme o esperado.
API de Dataframe
Leia a tabela usando o comando session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Leia a tabela usando spark.read.cassandraForma
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Colunas específicas de leitura na tabela
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Aplicar filtros
Você pode enviar predicados para o banco de dados para permitir consultas do Spark otimizadas. Um predicado é uma condição em uma consulta que retorna true ou false, normalmente localizada na cláusula WHERE. Uma ação de predicado filtra os dados na consulta do banco de dados, reduzindo o número de entradas recuperadas do banco e melhorando o desempenho da consulta. Por padrão, a API do conjunto de dados do Spark enviará automaticamente cláusulas de WHERE válidas para o banco de dados.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
A seção Cassandra Filters
do plano físico inclui o filtro de push down.
API DE RDD
Tabela de leitura
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Colunas específicas de leitura na tabela
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Exibições do SQL
Criar um modo de exibição temporário de um dataframe
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Executar consultas no modo de exibição
select * from books_vw where book_pub_year > 1891
Próximas etapas
A seguir estão artigos adicionais sobre como trabalhar com o Azure Cosmos DB for Apache Cassandra do Spark: