次の方法で共有


Spark からの Azure Cosmos DB for Apache Cassandra の DDL 操作

適用対象: Cassandra

この記事では、Spark からの Azure Cosmos DB for Apache Cassandra に対するキースペースとテーブルの DDL 操作について詳しく説明します。

Spark コンテキスト

Cassandra 用 API のコネクタには、spark コンテキストの一部として、Cassandra への接続の詳細の初期化が必要です。 ノートブックを起動すると、Spark コンテキストは既に初期化されています。停止して再初期化することはお勧めしません。 解決方法の 1 つは、クラスターの spark 構成で、Cassandra 用 API インスタンス構成をクラスター レベルで追加することです。 これは、クラスターごとに 1 回限りのアクティビティです。 Spark の構成情報に、以下のコードをスペース区切りのキー値のペアとして追加します。

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

注意

Spark 3.x を使っている場合は、Azure Cosmos DB ヘルパーと接続ファクトリをインストールする必要はありません。 また、Spark 3 コネクタの場合は、connections_per_executor_max ではなく remoteConnectionsPerExecutor を使用する必要があります (上記を参照)。

警告

この記事で示される Spark 3 サンプルは、Spark バージョン 3.2.1 と、対応する Cassandra Spark Connector の com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 でテストされています。 それより後のバージョンの Spark や Cassandra コネクタは、予期するとおりには機能しない場合があります。

キースペースの DDL 操作

キースペースを作成する

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

cqlsh で検証する

cqlsh で次のコマンドを実行すると、先ほど作成したキースペースが表示されます。

DESCRIBE keyspaces;

キースペースを削除する

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

cqlsh で検証する

DESCRIBE keyspaces;

テーブルの DDL 操作

考慮事項:

  • create table ステートメントを使用して、テーブル レベルでスループットを割り当てることができます。
  • 1 つのパーティション キーで、20 GB のデータを格納できます。
  • 1 つのレコードで、最大 2 MB のデータを格納できます。
  • 1 つのパーティション キー範囲で、複数のパーティション キーを格納できます。

テーブルを作成する

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

cqlsh で検証する

cqlsh で次のコマンドを実行すると、"books" という名前のテーブルが表示されます。

USE books_ks;
DESCRIBE books;

前のコマンドの出力には、プロビジョニングされたスループットと既定の TTL の値は表示されません。これらの値はポータルで確認できます。

テーブルを変更する

alter table コマンドを使用して、次の値を変更できます。

  • プロビジョニングされているスループット
  • Time-To-Live の値
    現在、列の変更はサポートされていません。
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

テーブルの削除

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cqlsh で検証する

cqlsh で次のコマンドを実行すると、"books" テーブルが使用できなくなったことがわかります。

USE books_ks;
DESCRIBE tables;

次のステップ

キースペースとテーブルを作成した後は、CRUD 操作などに関する次の記事に進んでください。