Operazioni di aggregazione su tabelle di Azure Cosmos DB for Apache Cassandra da Spark
SI APPLICA A: Cassandra
Questo articolo descrive le operazioni di aggregazione di base per le tabelle di Azure Cosmos DB for Apache Cassandra da Spark.
Nota
Il filtro sul lato server e l'aggregazione sul lato server attualmente non sono supportati in Azure Cosmos DB for Apache Cassandra.
Configurazione dell'API per Cassandra
Impostare la configurazione Spark seguente nel cluster del notebook. Si tratta di un'attività una tantum.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Se si usa Spark 3.x, non è necessario installare l'helper e la factory di connessione di Azure Cosmos DB. È anche consigliabile usare remoteConnectionsPerExecutor
anziché connections_per_executor_max
per il connettore Spark 3 (vedere sopra).
Avviso
I campioni di Spark 3 illustrati in questo articolo sono stati testati con Spark versione 3.2.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.
Generatore di dati di esempio
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operazione di conteggio
API RDD
sc.cassandraTable("books_ks", "books").count
Output:
count: Long = 5
API dataframe
Il conteggio sui dataframe sono è attualmente supportato. L'esempio seguente illustra come eseguire un conteggio di dataframe dopo il salvataggio permanente del dataframe in memoria come soluzione alternativa.
Scegliere un'opzione di archiviazione tra le opzioni disponibili seguenti, per evitare di incorrere in problemi di "memoria insufficiente":
MEMORY_ONLY: questa è l'opzione di archiviazione predefinita. Archivia i set di dati RDD come oggetti Java deserializzati nella JVM. Se il set di dati RDD non entra nella memoria, alcune partizioni non vengono messe in cache e vengono ricalcolate in tempo reale ogni volta che sono necessarie.
MEMORY_AND_DISK: archivia i set di dati RDD come oggetti Java deserializzati nella JVM. Se il set di dati RDD non entra nella memoria, archiviare le partizioni che non entrano nel disco e, quando necessario, leggerle dal percorso in cui sono archiviate.
MEMORY_ONLY_SER (Java/Scala): archivia i set di dati RDD come oggetti Java serializzati, con una matrice da 1 byte per ogni partizione. Questa opzione risparmia spazio rispetto agli oggetti deserializzati, soprattutto quando si usa un serializzatore veloce, ma la lettura richiede più lavoro della CPU.
MEMORY_AND_DISK_SER (Java/Scala): questa opzione di archiviazione è simile a MEMORY_ONLY_SER, l'unica differenza è che elimina le partizioni che non entrano nella memoria del disco anziché ricalcolarle quando servono.
DISK_ONLY: archivia le partizioni RDD solo sul disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2…: come i livelli sopra, ma replica ogni partizione in due nodi del cluster.
OFF_HEAP (sperimentale): simile a MEMORY_ONLY_SER, ma archivia i dati nella memoria non heap e richiede che la memoria non heap sia attivata anticipatamente.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Operazione per la determinazione della media
API RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Output:
res24: Double = 16.016000175476073
API dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Output:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Output:
16.016000175476073
Operazione per la determinazione del minimo
API RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Output:
res31: Float = 11.33
API dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Output:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Output:
11.33
Operazione per la determinazione del massimo
API RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
API dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Output:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Output:
22.45
Operazione per la determinazione della somma
API RDD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Output:
res46: Double = 80.08000087738037
API dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Output:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Output:
80.08000087738037
Operazione per la determinazione dei primi elementi o di elementi confrontabili
API RDD
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Output:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
API dataframe
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Output:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;