다음을 통해 공유


Spark를 사용하여 Azure Cosmos DB for Apache Cassandra 테이블의 데이터 읽기

적용 대상: Cassandra

이 문서에서는 Spark에서 Azure Cosmos DB for Apache Cassandra에 저장된 데이터를 읽는 방법을 설명합니다.

API for Cassandra 구성

Notebook 클러스터에서 아래 Spark 구성을 설정합니다. 이 작업은 한 번만 수행하면 됩니다.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

참고 항목

Spark 3.x를 사용하는 경우 Azure Cosmos DB 도우미 및 연결 팩터리를 설치할 필요가 없습니다. Spark 3 커넥터에 connections_per_executor_max 대신 remoteConnectionsPerExecutor을 사용해야 합니다(위 참조).

Warning

이 문서에 표시된 Spark 3 샘플은 Spark 버전 3.2.1 및 해당 Cassandra Spark 커넥터 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0으로 테스트되었습니다. Spark 및/또는 Cassandra 커넥터의 최신 버전은 예상대로 작동하지 않을 수 있습니다.

데이터 프레임 API

session.read.format 명령을 사용하여 테이블 읽기

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

spark.read.cassandraFormat을 사용하여 테이블 읽기

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

테이블의 특정 열 읽기

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

필터 적용

최적화된 Spark 쿼리를 위해 데이터베이스에 조건자를 푸시다운할 수 있습니다. 조건자는 true 또는 false를 반환하는 쿼리의 조건으로, 일반적으로 WHERE 절에 위치합니다. 조건자 푸시다운은 데이터베이스 쿼리의 데이터를 필터링하여 데이터베이스에서 검색된 항목 수를 줄이고 쿼리 성능을 향상시킵니다. 기본값으로 Spark 데이터 세트 API는 유효한 WHERE 절을 데이터베이스에 자동으로 푸시다운합니다.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

물리적 계획의 Cassandra Filters 섹션에는 푸시다운된 필터가 포함됩니다.

파티션

RDD API

테이블 읽기

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

테이블의 특정 열 읽기

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL 보기

데이터 프레임에서 임시 보기 만들기

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

보기에 대해 쿼리 실행

select * from books_vw where book_pub_year > 1891

다음 단계

아래에는 Spark에서 Azure Cosmos DB for Apache Cassandra로 작업하는 방법을 소개하는 추가 문서가 나와 있습니다.