Compartilhar via


Agregar operações no Azure Cosmos DB for Apache Cassandra a partir do Spark

APLICA-SE AO: Cassandra

Este artigo aborda as operações básicas de agregação das tabelas do Azure Cosmos DB for Apache Cassandra a partir do Spark.

Observação

A filtragem do lado do servidor e a agregação no lado do servidor atualmente não têm suporte no Azure Cosmos DB for Apache Cassandra.

Configuração da API do Cassandra

Defina a configuração do Spark abaixo no cluster do notebook. Trata-se de uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

Observação

Se estiver usando o Spark 3.x, você não precisará instalar o auxiliar do Azure Cosmos DB nem o alocador de conexões. Você também deve usar remoteConnectionsPerExecutor em vez de connections_per_executor_max para o conector do Spark 3 (veja acima).

Aviso

Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e com o Conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. As versões mais novas do Spark e/ou do conector do Cassandra podem não funcionar conforme o esperado.

Gerador de dados de exemplo

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Operação de contagem

API DE RDD

sc.cassandraTable("books_ks", "books").count

Saída:

count: Long = 5

API de Dataframe

A contagem de frames de dados não é atualmente suportada. O exemplo abaixo mostra como executar uma contagem do dataframe depois de persistir o dataframe na memória como uma solução alternativa.

Escolha uma opção de armazenamento nas seguintes opções disponíveis, para evitar problemas de "falta de memória":

  • MEMORY_ONLY: é a opção de armazenamento padrão. Armazena o RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, algumas partições não serão armazenadas em cache e serão recalculadas na hora toda vez que forem necessárias.

  • MEMORY_AND_DISK: Armazena o RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, armazene as partições que não cabem no disco e, sempre que necessário, leia-as no local em que estão armazenadas.

  • MEMORY_ONLY_SER (Java/Scala): Armazena o RDD como objetos Java serializados - matriz de um byte por partição. Essa opção é eficiente em termos de espaço quando comparada a objetos desserializados, especialmente ao usar um serializador rápido, mas exige mais uso da CPU para ler.

  • MEMORY_AND_DISK_SER (Java / Scala): Esta opção de armazenamento é como MEMORY_ONLY_SER, a única diferença é que ele despeja partições que não cabem na memória do disco, em vez de recalculá-las quando forem necessárias.

  • DISK_ONLY: armazena as partições RDD somente no disco.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2: o mesmo que os níveis acima, porém replica cada partição em dois nós do cluster.

  • OFF_HEAP (experimental): Semelhante a MEMORY_ONLY_SER, mas armazena os dados na memória fora do heap e exige que a memória fora do heap seja ativada antes do tempo.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Operação de média

API DE RDD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Saída:

res24: Double = 16.016000175476073

API de Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Saída:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

Saída:

16.016000175476073

Operação de mínimo

API DE RDD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Saída:

res31: Float = 11.33

API de Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Saída:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

Saída:

11.33

Operação de máximo

API DE RDD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

API de Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Saída:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

Saída:

22.45

Operação de soma

API DE RDD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Saída:

res46: Double = 80.08000087738037

API de Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Saída:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

Saída:

80.08000087738037

Operação comparável ou superior

API DE RDD

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Saída:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

API de Dataframe

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Saída:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Próxima etapa