Spark からの Azure Cosmos DB for Apache Cassandra の DDL 操作
適用対象: Cassandra
この記事では、Spark からの Azure Cosmos DB for Apache Cassandra に対するキースペースとテーブルの DDL 操作について詳しく説明します。
Spark コンテキスト
Cassandra 用 API のコネクタには、spark コンテキストの一部として、Cassandra への接続の詳細の初期化が必要です。 ノートブックを起動すると、Spark コンテキストは既に初期化されています。停止して再初期化することはお勧めしません。 解決方法の 1 つは、クラスターの spark 構成で、Cassandra 用 API インスタンス構成をクラスター レベルで追加することです。 これは、クラスターごとに 1 回限りのアクティビティです。 Spark の構成情報に、以下のコードをスペース区切りのキー値のペアとして追加します。
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Cassandra 用 API に関連する構成
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
注意
Spark 3.x を使っている場合は、Azure Cosmos DB ヘルパーと接続ファクトリをインストールする必要はありません。 また、Spark 3 コネクタの場合は、connections_per_executor_max
ではなく remoteConnectionsPerExecutor
を使用する必要があります (上記を参照)。
警告
この記事で示される Spark 3 サンプルは、Spark バージョン 3.2.1 と、対応する Cassandra Spark Connector の com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 でテストされています。 それより後のバージョンの Spark や Cassandra コネクタは、予期するとおりには機能しない場合があります。
キースペースの DDL 操作
キースペースを作成する
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
cqlsh で検証する
cqlsh で次のコマンドを実行すると、先ほど作成したキースペースが表示されます。
DESCRIBE keyspaces;
キースペースを削除する
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
cqlsh で検証する
DESCRIBE keyspaces;
テーブルの DDL 操作
考慮事項:
- create table ステートメントを使用して、テーブル レベルでスループットを割り当てることができます。
- 1 つのパーティション キーで、20 GB のデータを格納できます。
- 1 つのレコードで、最大 2 MB のデータを格納できます。
- 1 つのパーティション キー範囲で、複数のパーティション キーを格納できます。
テーブルを作成する
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
cqlsh で検証する
cqlsh で次のコマンドを実行すると、"books" という名前のテーブルが表示されます。
USE books_ks;
DESCRIBE books;
前のコマンドの出力には、プロビジョニングされたスループットと既定の TTL の値は表示されません。これらの値はポータルで確認できます。
テーブルを変更する
alter table コマンドを使用して、次の値を変更できます。
- プロビジョニングされているスループット
- Time-To-Live の値
現在、列の変更はサポートされていません。
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
テーブルの削除
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cqlsh で検証する
cqlsh で次のコマンドを実行すると、"books" テーブルが使用できなくなったことがわかります。
USE books_ks;
DESCRIBE tables;
次のステップ
キースペースとテーブルを作成した後は、CRUD 操作などに関する次の記事に進んでください。