從 Spark 對 Azure Cosmos DB for Apache Cassandra 資料表進行的彙總作業
適用於: Cassandra
本文說明從 Spark 對 Azure Cosmos DB for Apache Cassandra 資料表進行的基本彙總作業。
注意
Azure Cosmos DB for Apache Cassandra 中目前不支援伺服器端篩選和伺服器端彙總。
API for Cassandra 設定
在您的 Notebook 叢集中設定下列 Spark 設定。 這是一次性的活動。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
如果您使用 Spark 3.x,則無須安裝 Azure Cosmos DB 協助程式和連線中心。 您也應使用 remoteConnectionsPerExecutor
,而不是 connections_per_executor_max
Spark 3 連接器 (如上述)。
警告
本文所示的 Spark 3 範例已使用 Spark 3.2.1 版和對應的 Cassandra Spark 連接器 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 進行測試。 較新版本的 Spark 和/或 Cassandra 連接器可能無法如預期般運作。
範例資料產生器
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
計數運算
RDD API
sc.cassandraTable("books_ks", "books").count
輸出:
count: Long = 5
Dataframe API
目前不支援計算 dataframe。 以下範例說明如何在將 dataframe 保存至記憶體來作為因應措施之後,執行 dataframe 計數。
請從下列可用選項中選擇一個儲存體選項,以避免發生「記憶體不足」問題:
MEMORY_ONLY:這是預設的儲存體選項。 會將 RDD 儲存成 JVM 中已還原序列化的 Java 物件。 如果記憶體無法容納 RDD,系統就不會快取某些分割區,而會在每次需要它們時即時提出建議。
MEMORY_AND_DISK:會將 RDD 儲存成 JVM 中已還原序列化的 Java 物件。 如果記憶體無法容納 RDD,請將無法容納的分割區儲存在磁碟上,然後在每次需要時,從其儲存位置加以讀取。
MEMORY_ONLY_SER (Java/Scala):會將 RDD 儲存成已序列化的 Java 物件 (每一分割區 1 個位元組陣列)。 與已還原序列化的物件相比,此選項更符合空間效益,尤其是當使用快速但需要更大量 CPU 來進行讀取的序列化程式時。
MEMORY_AND_DISK_SER (Java/Scala):此儲存體選項與 MEMORY_ONLY_SER 類似,唯一的差異在於它會讓磁碟記憶體所無法容納的分割區溢出,而不會在需要它們時重新進行計算。
DISK_ONLY:會將 RDD 分割區只儲存在磁碟。
MEMORY_ONLY_2、MEMORY_AND_DISK_2…:與上述層級相同,但會在兩個叢集節點上複寫每個分割區。
OFF_HEAP (實驗性):與 MEMORY_ONLY_SER 類似,但會將資料儲存在堆集外記憶體中,而需要事先啟用堆集外記憶體。
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
平均值運算
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
輸出:
res24: Double = 16.016000175476073
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
輸出:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
輸出:
16.016000175476073
最小值運算
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
輸出:
res31: Float = 11.33
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
輸出:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
輸出:
11.33
最大值運算
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
輸出:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
輸出:
22.45
總和運算
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
輸出:
res46: Double = 80.08000087738037
Dataframe API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
輸出:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
輸出:
80.08000087738037
排行或比較運算
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
輸出:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Dataframe API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
輸出:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;