次の方法で共有


Spark を使用して Azure Cosmos DB for Apache Cassandra テーブルからデータを読み取る

適用対象: Cassandra

この記事では、Spark から Azure Cosmos DB for Apache Cassandra に格納されたデータを読み取る方法について説明します。

Cassandra 用 API の構成

ノートブック クラスターの Spark 構成で設定します。 これは 1 回限りのアクティビティです。

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

注意

Spark 3.x を使っている場合は、Azure Cosmos DB ヘルパーと接続ファクトリをインストールする必要はありません。 また、Spark 3 コネクタの場合は、connections_per_executor_max ではなく remoteConnectionsPerExecutor を使用する必要があります (上記を参照)。

警告

この記事で示される Spark 3 サンプルは、Spark バージョン 3.2.1 と、対応する Cassandra Spark Connector の com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 でテストされています。 それより後のバージョンの Spark や Cassandra コネクタは、予期するとおりには機能しない場合があります。

データフレーム API

session.read.format コマンドを使用してテーブルを読み取る

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

spark.read.cassandraFormat を使用してテーブルを読み取る

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

テーブル内の特定の列を読み取る

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

フィルターを適用する

データベースに述語をプッシュ ダウンすると、より適切に最適化された Spark クエリを実行できます。 述語とは true または false を返すクエリに対する条件で、通常は WHERE 句に配置されます。 述語プッシュ ダウンは、データベース クエリのデータをフィルター処理することで、データベースから取得されるエントリの数を減らしてクエリのパフォーマンスを向上させます。 既定では、Spark データセット API は有効な WHERE 句をデータベースに自動的にプッシュダウンします。

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

物理プランの Cassandra Filters セクションには、プッシュ ダウン フィルターも含まれます。

パーティション

RDD API

テーブルを読み取る

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

テーブル内の特定の列を読み取る

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL ビュー

データフレームから一時ビューを作成する

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

ビューに対してクエリを実行する

select * from books_vw where book_pub_year > 1891

次のステップ

Spark からの Azure Cosmos DB for Apache Cassandra の操作に関するその他の記事を以下に示します。