Partage via


Opérations d’agrégation sur des tables Azure Cosmos DB for Apache Cassandra à partir de Spark

S’APPLIQUE À : Cassandra

Cet article décrit les opérations d’agrégation de base sur des tables Azure Cosmos DB for Apache Cassandra à partir de Spark.

Notes

Le filtrage côté serveur et l’agrégation côté serveur ne sont pas pris en charge dans Azure Cosmos DB for Apache Cassandra.

API pour la configuration Cassandra

Définissez la configuration spark ci-dessous dans votre cluster de notebooks. Cette activité s’effectue une seule fois.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

Notes

Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor plutôt que connections_per_executor_max pour le connecteur Spark 3 (cf. ci-dessus).

Avertissement

Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.

Générateur d’exemple de données

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Opération count

API pour le jeu de donnée distribué résilient

sc.cassandraTable("books_ks", "books").count

Output:

count: Long = 5

API Dataframe

L’opération de comptage sur les dataframes n’est pas prise en charge. En guise de solution de contournement, l’exemple ci-dessous montre comment exécuter une opération de comptage sur un dataframe après avoir conservé celui-ci en mémoire.

Choisissez une option de stockage parmi les options disponibles suivantes, pour éviter de rencontrer des problèmes de « mémoire insuffisante » :

  • MEMORY_ONLY : option de stockage par défaut. Stocke le jeu de donnée distribué résilient en tant qu’objets Java désérialisés dans la machine virtuelle Java. Si le jeu de donnée distribué résilient ne tient pas dans la mémoire, certaines partitions ne sont pas mises en cache et sont recalculées à la volée chaque fois qu’elles sont nécessaires.

  • MEMORY_AND_DISK : stocke le jeu de donnée distribué résilient en tant qu’objets Java désérialisés dans la machine virtuelle Java. Si le jeu de donnée distribué résilient ne tient pas dans la mémoire, stocke les partitions qui ne tiennent pas sur le disque et, chaque fois que nécessaire, les lit à l’emplacement où elles sont stockées.

  • MEMORY_ONLY_SER (Java/Scala) : stocke le jeu de donnée distribué résilient sous forme d’objets Java sérialisés, à raison d’un tableau de 1 octet par partition. Cette option est moins gourmande en espace par rapport à des objets désérialisés, surtout quand vous utilisez un sérialiseur rapide, mais les opérations de lecture sont plus gourmandes en UC.

  • MEMORY_AND_DISK_SER (Java/Scala) : cette option de stockage ressemble à MEMORY_ONLY_SER, la seule différence étant qu’elle déverse les partitions qui ne tiennent pas dans la mémoire du disque au lieu de les recalculer quand elles sont nécessaires.

  • DISK_ONLY : stocke les partitions du jeu de donnée distribué résilient sur le disque uniquement.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2... : identique aux niveaux ci-dessus, mais réplique chaque partition sur deux nœuds de cluster.

  • OFF_HEAP (option expérimentale) : similaire à MEMORY_ONLY_SER, mais elle stocke les données dans la mémoire hors tas, et nécessite que celle-ci soit activée à l’avance.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Opération mean

API pour le jeu de donnée distribué résilient

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Output:

res24: Double = 16.016000175476073

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Output:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

Output:

16.016000175476073

Opération min

API pour le jeu de donnée distribué résilient

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Output:

res31: Float = 11.33

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Output:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

Output:

11.33

Opération max

API pour le jeu de donnée distribué résilient

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Output:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

Output:

22.45

Opération sum

API pour le jeu de donnée distribué résilient

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Output:

res46: Double = 80.08000087738037

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Output:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

Output:

80.08000087738037

Opération top ou comparable

API pour le jeu de donnée distribué résilient

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Output:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

API Dataframe

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Output:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Étape suivante