次の方法で共有


Spark から Azure Cosmos DB for Apache Cassandra テーブルへの集計操作

適用対象: Cassandra

この記事では、Spark から Azure Cosmos DB for Apache Cassandra のテーブルへの基本的な集計操作について説明します。

注意

Azure Cosmos DB for Apache Cassandra では、サーバー側のフィルター処理とサーバー側の集計は現在サポートされていません。

Cassandra 用 API の構成

ノートブック クラスターの Spark 構成で設定します。 これは 1 回限りのアクティビティです。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

注意

Spark 3.x を使っている場合は、Azure Cosmos DB ヘルパーと接続ファクトリをインストールする必要はありません。 また、Spark 3 コネクタの場合は、connections_per_executor_max ではなく remoteConnectionsPerExecutor を使用する必要があります (上記を参照)。

警告

この記事で示される Spark 3 サンプルは、Spark バージョン 3.2.1 と、対応する Cassandra Spark Connector の com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 でテストされています。 新しいバージョンの Spark や Cassandra コネクタは、予期するとおりには機能しない場合があります。

サンプル データ ジェネレーター

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

カウント操作

RDD API

sc.cassandraTable("books_ks", "books").count

出力:

count: Long = 5

データフレーム API

データフレームに対するカウントは現在サポートされていません。 以下のサンプルは、回避策としてデータフレームをメモリに保存した後にデータフレームのカウントを実行する方法を示しています。

"メモリ不足" の問題が発生しないように回避するには、次の使用できるオプションからストレージ オプションを選択します。

  • MEMORY_ONLY: これは既定のストレージ オプションです。 JVM 内の逆シリアル化された Java オブジェクトとして RDD を格納します。 RDD がメモリ内に収まらない場合、一部のパーティションはキャッシュされず、必要になるたびにその場で再計算されます。

  • MEMORY_AND_DISK: JVM 内の逆シリアル化された Java オブジェクトとして RDD を格納します。 RDD がメモリ内に収まらない場合、収まらないパーティションをディスク上に格納し、必要に応じて、格納されている場所から読み取ります。

  • MEMORY_ONLY_SER (Java/Scala): シリアル化された Java オブジェクト (パーティションごとに 1 バイトの配列) として RDD を格納します。 逆シリアル化されたオブジェクトと比較すると、このオプションは、特に高速シリアライザーを使用する場合にスペース効率が良くなりますが、読み取りに使用される CPU が多くなります。

  • MEMORY_AND_DISK_SER (Java/Scala): このストレージ オプションは MEMORY_ONLY_SER と似ていますが、唯一の違いは、必要に応じて再計算が実行されるのではなく、ディスク メモリ内に収まらないパーティションのスピルが発生することです。

  • DISK_ONLY: RDD パーティションをディスク上にのみ格納します。

  • MEMORY_ONLY_2、MEMORY_AND_DISK_2...: 上記のレベルと同じですが、各パーティションをクラスタ内の 2 つのノードにレプリケートします。

  • OFF_HEAP (実験的): MEMORY_ONLY_SER に似ていますが、データをオフヒープ メモリに格納します。事前にオフヒープ メモリを有効にする必要があります。

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

平均操作

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

出力:

res24: Double = 16.016000175476073

データフレーム API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

出力:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

出力:

16.016000175476073

最小値操作

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

出力:

res31: Float = 11.33

データフレーム API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

出力:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

出力:

11.33

最大値操作

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

データフレーム API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

出力:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

出力:

22.45

合計操作

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

出力:

res46: Double = 80.08000087738037

データフレーム API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

出力:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

出力:

80.08000087738037

上位または同等操作

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

出力:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

データフレーム API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

出力:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

次のステップ