Operacje agregowania w usłudze Azure Cosmos DB dla tabel apache Cassandra z platformy Spark
DOTYCZY: Kasandra
W tym artykule opisano podstawowe operacje agregacji względem tabel usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark.
Uwaga
Filtrowanie po stronie serwera i agregacja po stronie serwera nie jest obecnie obsługiwana w usłudze Azure Cosmos DB dla systemu Apache Cassandra.
Interfejs API dla konfiguracji bazy danych Cassandra
Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Uwaga
Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor
zamiast connections_per_executor_max
łącznika Spark 3 (zobacz powyżej).
Ostrzeżenie
Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.
Przykładowy generator danych
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operacja licznika
RDD API
sc.cassandraTable("books_ks", "books").count
Wyjście:
count: Long = 5
Interfejs API ramki danych
Liczba ramek danych nie jest obecnie obsługiwana. W poniższym przykładzie pokazano, jak wykonać liczbę ramek danych po utrwalonej ramce danych w pamięci jako obejście problemu.
Wybierz opcję magazynu z następujących dostępnych opcji, aby uniknąć problemów z brakiem pamięci:
MEMORY_ONLY: jest to domyślna opcja magazynu. Przechowuje rdD jako deserializowane obiekty Java w maszynie JVM. Jeśli RDD nie mieści się w pamięci, niektóre partycje nie będą buforowane i są ponownie obliczane na bieżąco za każdym razem, gdy są potrzebne.
MEMORY_AND_DISK: przechowuje RDD jako deserializowane obiekty Java w maszynie JVM. Jeśli RDD nie mieści się w pamięci, zapisz partycje, które nie mieszczą się na dysku i zawsze, gdy jest to wymagane, odczytaj je z lokalizacji, w której są przechowywane.
MEMORY_ONLY_SER (Java/Scala): przechowuje RDD jako serializowane obiekty Java — 1-bajtową tablicę na partycję. Ta opcja jest wydajna w porównaniu z deserializowanymi obiektami, zwłaszcza w przypadku korzystania z szybkiego serializatora, ale intensywniejszego odczytu procesora CPU.
MEMORY_AND_DISK_SER (Java/Scala): Ta opcja magazynu przypomina MEMORY_ONLY_SER, jedyną różnicą jest to, że rozla partycje, które nie mieszczą się w pamięci dysku, zamiast ponownie je skompilować, gdy są potrzebne.
DISK_ONLY: przechowuje tylko partycje RDD na dysku.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: takie same jak powyższe poziomy, ale replikuje każdą partycję w dwóch węzłach klastra.
OFF_HEAP (eksperymentalna): podobnie jak MEMORY_ONLY_SER, ale przechowuje dane w pamięci poza stertą i wymaga włączenia pamięci poza stertą.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Średnia operacja
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Wyjście:
res24: Double = 16.016000175476073
Interfejs API ramki danych
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Wyjście:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Wyjście:
16.016000175476073
Minimalna operacja
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Wyjście:
res31: Float = 11.33
Interfejs API ramki danych
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Wyjście:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Wyjście:
11.33
Maksymalna operacja
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Interfejs API ramki danych
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Wyjście:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Wyjście:
22.45
Operacja sumowania
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Wyjście:
res46: Double = 80.08000087738037
Interfejs API ramki danych
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Wyjście:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Wyjście:
80.08000087738037
Operacja top lub porównywalna
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Wyjście:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Interfejs API ramki danych
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Wyjście:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;