Spark의 Azure Cosmos DB for Apache Cassandra에서 DDL 작업
적용 대상: Cassandra
이 문서에서는 Spark에서 Azure Cosmos DB for Apache Cassandra에 대한 키스페이스 및 테이블 DDL 작업을 자세히 설명합니다.
Spark 컨텍스트
API for Cassandra용 커넥터에서는 Spark 컨텍스트의 일부분으로 Cassandra 연결 세부 정보를 초기화해야 합니다. Notebook을 시작하면 Spark 컨텍스트가 이미 초기화되어 있으므로 중지했다가 다시 초기화하는 것은 권하지 않습니다. 이러한 경우에 사용할 수 있는 방법 중 하나는 클러스터 Spark 구성에서 클러스터 수준에 API for Cassandra 인스턴스 구성을 추가하는 것입니다. 이 작업은 클러스터당 한 번씩만 수행하면 됩니다. 다음 코드를 공백으로 구분된 키-값 쌍으로 Spark 구성에 추가합니다.
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
API for Cassandra 관련 구성
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
참고 항목
Spark 3.x를 사용하는 경우 Azure Cosmos DB 도우미 및 연결 팩터리를 설치할 필요가 없습니다. Spark 3 커넥터에 connections_per_executor_max
대신 remoteConnectionsPerExecutor
을 사용해야 합니다(위 참조).
Warning
이 문서에 표시된 Spark 3 샘플은 Spark 버전 3.2.1 및 해당 Cassandra Spark 커넥터 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1로 테스트되었습니다. Spark 및/또는 Cassandra 커넥터의 최신 버전은 예상대로 작동하지 않을 수 있습니다.
키스페이스 DDL 작업
키스페이스 만들기
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
cqlsh에서 유효성 검사
cqlsh에서 다음 명령을 실행하고 앞에서 만든 키스페이스가 표시되어야 합니다.
DESCRIBE keyspaces;
키스페이스 삭제
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
cqlsh에서 유효성 검사
DESCRIBE keyspaces;
테이블 DDL 작업
고려 사항:
- create table 문을 사용하여 테이블 수준에서 처리량을 할당할 수 있습니다.
- 하나의 파티션 키는 20GB의 데이터를 저장할 수 있습니다.
- 하나의 레코드는 최대 2MB의 데이터를 저장할 수 있습니다.
- 하나의 파티션 키 범위는 여러 파티션 키를 저장할 수 있습니다.
테이블 만들기
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
cqlsh에서 유효성 검사
cqlsh에서 다음 명령을 실행하고 "books"라는 테이블이 표시되어야 합니다.
USE books_ks;
DESCRIBE books;
프로비전된 처리량 및 기본 TTL 값은 이전 명령의 출력에 표시되지 않습니다. 포털에서 이러한 값을 가져올 수 있습니다.
alter table
alter table 명령을 사용하여 다음 값을 변경할 수 있습니다.
- 프로비저닝된 처리량
- TTL(Time to Live) 값
열 변경은 현재 지원되지 않습니다.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
테이블 삭제
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cqlsh에서 유효성 검사
cqlsh에서 다음 명령을 실행하고 "books" 테이블이 더 이상 지원되지 않는다고 표시되어야 합니다.
USE books_ks;
DESCRIBE tables;
다음 단계
키스페이스 및 테이블을 만들면 CRUD 작업 등에 대한 다음 문서를 진행합니다.