Spark から Azure Cosmos DB for Apache Cassandra テーブルへの集計操作
適用対象: Cassandra
この記事では、Spark から Azure Cosmos DB for Apache Cassandra のテーブルへの基本的な集計操作について説明します。
注意
Azure Cosmos DB for Apache Cassandra では、サーバー側のフィルター処理とサーバー側の集計は現在サポートされていません。
Cassandra 用 API の構成
ノートブック クラスターの Spark 構成で設定します。 これは 1 回限りのアクティビティです。
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
注意
Spark 3.x を使っている場合は、Azure Cosmos DB ヘルパーと接続ファクトリをインストールする必要はありません。 また、Spark 3 コネクタの場合は、connections_per_executor_max
ではなく remoteConnectionsPerExecutor
を使用する必要があります (上記を参照)。
警告
この記事で示される Spark 3 サンプルは、Spark バージョン 3.2.1 と、対応する Cassandra Spark Connector の com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 でテストされています。 新しいバージョンの Spark や Cassandra コネクタは、予期するとおりには機能しない場合があります。
サンプル データ ジェネレーター
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
カウント操作
RDD API
sc.cassandraTable("books_ks", "books").count
出力:
count: Long = 5
データフレーム API
データフレームに対するカウントは現在サポートされていません。 以下のサンプルは、回避策としてデータフレームをメモリに保存した後にデータフレームのカウントを実行する方法を示しています。
"メモリ不足" の問題が発生しないように回避するには、次の使用できるオプションからストレージ オプションを選択します。
MEMORY_ONLY: これは既定のストレージ オプションです。 JVM 内の逆シリアル化された Java オブジェクトとして RDD を格納します。 RDD がメモリ内に収まらない場合、一部のパーティションはキャッシュされず、必要になるたびにその場で再計算されます。
MEMORY_AND_DISK: JVM 内の逆シリアル化された Java オブジェクトとして RDD を格納します。 RDD がメモリ内に収まらない場合、収まらないパーティションをディスク上に格納し、必要に応じて、格納されている場所から読み取ります。
MEMORY_ONLY_SER (Java/Scala): シリアル化された Java オブジェクト (パーティションごとに 1 バイトの配列) として RDD を格納します。 逆シリアル化されたオブジェクトと比較すると、このオプションは、特に高速シリアライザーを使用する場合にスペース効率が良くなりますが、読み取りに使用される CPU が多くなります。
MEMORY_AND_DISK_SER (Java/Scala): このストレージ オプションは MEMORY_ONLY_SER と似ていますが、唯一の違いは、必要に応じて再計算が実行されるのではなく、ディスク メモリ内に収まらないパーティションのスピルが発生することです。
DISK_ONLY: RDD パーティションをディスク上にのみ格納します。
MEMORY_ONLY_2、MEMORY_AND_DISK_2...: 上記のレベルと同じですが、各パーティションをクラスタ内の 2 つのノードにレプリケートします。
OFF_HEAP (実験的): MEMORY_ONLY_SER に似ていますが、データをオフヒープ メモリに格納します。事前にオフヒープ メモリを有効にする必要があります。
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
平均操作
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
出力:
res24: Double = 16.016000175476073
データフレーム API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
出力:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
出力:
16.016000175476073
最小値操作
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
出力:
res31: Float = 11.33
データフレーム API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
出力:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
出力:
11.33
最大値操作
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
データフレーム API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
出力:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
出力:
22.45
合計操作
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
出力:
res46: Double = 80.08000087738037
データフレーム API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
出力:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
出力:
80.08000087738037
上位または同等操作
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
出力:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
データフレーム API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
出力:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;