Agregar operações no Azure Cosmos DB for Apache Cassandra a partir do Spark
APLICA-SE AO: Cassandra
Este artigo aborda as operações básicas de agregação das tabelas do Azure Cosmos DB for Apache Cassandra a partir do Spark.
Observação
A filtragem do lado do servidor e a agregação no lado do servidor atualmente não têm suporte no Azure Cosmos DB for Apache Cassandra.
Configuração da API do Cassandra
Defina a configuração do Spark abaixo no cluster do notebook. Trata-se de uma atividade única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Observação
Se estiver usando o Spark 3.x, você não precisará instalar o auxiliar do Azure Cosmos DB nem o alocador de conexões. Você também deve usar remoteConnectionsPerExecutor
em vez de connections_per_executor_max
para o conector do Spark 3 (veja acima).
Aviso
Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e com o Conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. As versões mais novas do Spark e/ou do conector do Cassandra podem não funcionar conforme o esperado.
Gerador de dados de exemplo
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operação de contagem
API DE RDD
sc.cassandraTable("books_ks", "books").count
Saída:
count: Long = 5
API de Dataframe
A contagem de frames de dados não é atualmente suportada. O exemplo abaixo mostra como executar uma contagem do dataframe depois de persistir o dataframe na memória como uma solução alternativa.
Escolha uma opção de armazenamento nas seguintes opções disponíveis, para evitar problemas de "falta de memória":
MEMORY_ONLY: é a opção de armazenamento padrão. Armazena o RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, algumas partições não serão armazenadas em cache e serão recalculadas na hora toda vez que forem necessárias.
MEMORY_AND_DISK: Armazena o RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, armazene as partições que não cabem no disco e, sempre que necessário, leia-as no local em que estão armazenadas.
MEMORY_ONLY_SER (Java/Scala): Armazena o RDD como objetos Java serializados - matriz de um byte por partição. Essa opção é eficiente em termos de espaço quando comparada a objetos desserializados, especialmente ao usar um serializador rápido, mas exige mais uso da CPU para ler.
MEMORY_AND_DISK_SER (Java / Scala): Esta opção de armazenamento é como MEMORY_ONLY_SER, a única diferença é que ele despeja partições que não cabem na memória do disco, em vez de recalculá-las quando forem necessárias.
DISK_ONLY: armazena as partições RDD somente no disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2: o mesmo que os níveis acima, porém replica cada partição em dois nós do cluster.
OFF_HEAP (experimental): Semelhante a MEMORY_ONLY_SER, mas armazena os dados na memória fora do heap e exige que a memória fora do heap seja ativada antes do tempo.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Operação de média
API DE RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Saída:
res24: Double = 16.016000175476073
API de Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Saída:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Saída:
16.016000175476073
Operação de mínimo
API DE RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Saída:
res31: Float = 11.33
API de Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Saída:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Saída:
11.33
Operação de máximo
API DE RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
API de Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Saída:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Saída:
22.45
Operação de soma
API DE RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Saída:
res46: Double = 80.08000087738037
API de Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Saída:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Saída:
80.08000087738037
Operação comparável ou superior
API DE RDD
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Saída:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
API de Dataframe
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Saída:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;