Operaciones de agregado en tablas de Azure Cosmos DB for Apache Cassandra de Spark
SE APLICA A: Cassandra
En este artículo se describen operaciones de agregación básicas en tablas de Azure Cosmos DB for Apache Cassandra de Spark.
Nota
Actualmente no se admiten el filtrado ni la agregación del lado del servidor en Azure Cosmos DB for Apache Cassandra.
Configuración de la API para Cassandra
Establezca la configuración de Spark siguiente en el clúster del cuaderno. Es una actividad única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor
en lugar de connections_per_executor_max
para el conector de Spark 3 (consulte más arriba).
Advertencia
Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.
Generador de datos de ejemplo
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operación de recuento
RDD API
sc.cassandraTable("books_ks", "books").count
Salida:
count: Long = 5
Dataframe API
Actualmente no se admite el recuento de tramas de datos. En el ejemplo siguiente se muestra cómo ejecutar un recuento de tramas de datos después de guardar la trama de datos en la memoria como una solución alternativa.
Elija una de las siguientes opciones de almacenamiento para no tener problemas de "memoria insuficiente":
MEMORY_ONLY: se trata de la opción de almacenamiento predeterminada. Almacena RDD como objetos Java deserializados en la máquina virtual Java. Si el RDD no cabe en la memoria, algunas particiones no se almacenarán en caché y se volverán a calcular sobre la marcha cada vez que se necesiten.
MEMORY_AND_DISK: almacena RDD como objetos Java deserializados en la máquina virtual Java. Si el RDD no cabe en la memoria, almacene las particiones que no entran en el disco y, cuando sea necesario, léalos desde la ubicación en la que estén almacenados.
MEMORY_ONLY_SER (Java/Scala): almacena RDD como objetos Java serializados: una matriz de un byte por partición. Con esta opción el espacio es más eficiente en comparación con objetos deserializados, especialmente al utilizar un serializador rápido, pero más intensivo en la CPU a la hora de leer.
MEMORY_AND_DISK_SER (Java/Scala): esta opción de almacenamiento es similar a MEMORY_ONLY_SER, con la única diferencia de que deja desbordar las particiones que no caben en la memoria del disco en lugar de recalcularlas cuando es necesario.
DISK_ONLY: solo almacena las particiones RDD en el disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: igual que los niveles anteriores, pero replica cada partición en dos nodos de clúster.
OFF_HEAP (experimental): es similar a MEMORY_ONLY_SER, pero almacena los datos en la memoria fuera del montón y requiere que la memoria fuera del montón esté habilitada de antemano.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Operación de promedio
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Salida:
res24: Double = 16.016000175476073
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Salida:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Salida:
16.016000175476073
Operación mínima
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Salida:
res31: Float = 11.33
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Salida:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Salida:
11.33
Operación máxima
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Salida:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Salida:
22.45
Operación de suma
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Salida:
res46: Double = 80.08000087738037
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Salida:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Salida:
80.08000087738037
Operación comparable o superior
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Salida:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Dataframe API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Salida:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;
Paso siguiente
Table copy operations (Operaciones de copia en tabla)