Spark에서 Azure Cosmos DB for Apache Cassandra에 데이터 만들기/삽입
적용 대상: Cassandra
이 문서에서는 Spark에서 Azure Cosmos DB for Apache Cassandra의 테이블에 샘플 데이터를 삽입하는 방법을 설명합니다.
API for Cassandra 구성
Notebook 클러스터에서 아래 Spark 구성을 설정합니다. 이 작업은 한 번만 수행하면 됩니다.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
참고 항목
Spark 3.x를 사용하는 경우 Azure Cosmos DB 도우미 및 연결 팩터리를 설치할 필요가 없습니다. Spark 3 커넥터에 connections_per_executor_max
대신 remoteConnectionsPerExecutor
을 사용해야 합니다(위 참조).
Warning
이 문서에 표시된 Spark 3 샘플은 Spark 버전 3.2.1 및 해당 Cassandra Spark 커넥터 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0으로 테스트되었습니다. Spark 및/또는 Cassandra 커넥터의 최신 버전은 예상대로 작동하지 않을 수 있습니다.
데이터 프레임 API
샘플 데이터를 사용하여 데이터 프레임 만들기
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a dataframe containing five records
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")
//Review schema
booksDF.printSchema
//Print
booksDF.show
참고 항목
행 수준에서 “없는 경우 만들기” 기능은 아직 지원되지 않습니다.
Azure Cosmos DB for Apache Cassandra에 유지
또한 데이터를 저장할 때 다음 예제와 같이 Time-to-Live 및 일관성 정책 설정을 설정할 수 있습니다.
//Persist
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
cqlsh에서 유효성 검사
use books_ks;
select * from books;
RDD(복원력 있는 분산된 데이터베이스) API
샘플 데이터로 RDD 만들기
//Drop and re-create table to delete records created in the previous section
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
//Create RDD
val booksRDD = sc.parallelize(Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))
//Review
booksRDD.take(2).foreach(println)
참고 항목
없는 경우 만들기 기능은 아직 지원되지 않습니다.
Azure Cosmos DB for Apache Cassandra에 유지
API for Cassandra에 데이터를 저장할 때 다음 예와 같이 TTL(Time-to-Live) 및 일관성 정책 설정을 지정할 수도 있습니다.
import com.datastax.spark.connector.writer._
import com.datastax.oss.driver.api.core.ConsistencyLevel
//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))
cqlsh에서 유효성 검사
use books_ks;
select * from books;
다음 단계
Azure Cosmos DB for Apache Cassandra 테이블에 데이터를 삽입한 후 다음 문서를 진행하여 Azure Cosmos DB for Apache Cassandra에 저장된 데이터에 대해 다른 작업을 수행합니다.