Spark를 사용하여 Azure Cosmos DB for Apache Cassandra 테이블의 데이터 읽기
적용 대상: Cassandra
이 문서에서는 Spark에서 Azure Cosmos DB for Apache Cassandra에 저장된 데이터를 읽는 방법을 설명합니다.
API for Cassandra 구성
Notebook 클러스터에서 아래 Spark 구성을 설정합니다. 이 작업은 한 번만 수행하면 됩니다.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
참고 항목
Spark 3.x를 사용하는 경우 Azure Cosmos DB 도우미 및 연결 팩터리를 설치할 필요가 없습니다. Spark 3 커넥터에 connections_per_executor_max
대신 remoteConnectionsPerExecutor
을 사용해야 합니다(위 참조).
Warning
이 문서에 표시된 Spark 3 샘플은 Spark 버전 3.2.1 및 해당 Cassandra Spark 커넥터 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0으로 테스트되었습니다. Spark 및/또는 Cassandra 커넥터의 최신 버전은 예상대로 작동하지 않을 수 있습니다.
데이터 프레임 API
session.read.format 명령을 사용하여 테이블 읽기
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
spark.read.cassandraFormat을 사용하여 테이블 읽기
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
테이블의 특정 열 읽기
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
필터 적용
최적화된 Spark 쿼리를 위해 데이터베이스에 조건자를 푸시다운할 수 있습니다. 조건자는 true 또는 false를 반환하는 쿼리의 조건으로, 일반적으로 WHERE 절에 위치합니다. 조건자 푸시다운은 데이터베이스 쿼리의 데이터를 필터링하여 데이터베이스에서 검색된 항목 수를 줄이고 쿼리 성능을 향상시킵니다. 기본값으로 Spark 데이터 세트 API는 유효한 WHERE 절을 데이터베이스에 자동으로 푸시다운합니다.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
물리적 계획의 Cassandra Filters
섹션에는 푸시다운된 필터가 포함됩니다.
RDD API
테이블 읽기
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
테이블의 특정 열 읽기
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
SQL 보기
데이터 프레임에서 임시 보기 만들기
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
보기에 대해 쿼리 실행
select * from books_vw where book_pub_year > 1891
다음 단계
아래에는 Spark에서 Azure Cosmos DB for Apache Cassandra로 작업하는 방법을 소개하는 추가 문서가 나와 있습니다.