다음을 통해 공유


Spark에서 Azure Cosmos DB for Apache Cassandra 테이블에 대한 집계 연산

적용 대상: Cassandra

이 문서에서는 Spark에서 Azure Cosmos DB for Apache Cassandra 테이블에 대한 기본적인 집계 연산에 대해 설명합니다.

참고 항목

Azure Cosmos DB for Apache Cassandra에서는 현재 서버 쪽 필터링과 서버 쪽 집계가 지원되지 않습니다.

API for Cassandra 구성

Notebook 클러스터에서 아래 Spark 구성을 설정합니다. 이 작업은 한 번만 수행하면 됩니다.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

참고 항목

Spark 3.x를 사용하는 경우 Azure Cosmos DB 도우미 및 연결 팩터리를 설치할 필요가 없습니다. Spark 3 커넥터에 connections_per_executor_max 대신 remoteConnectionsPerExecutor을 사용해야 합니다(위 참조).

Warning

이 문서에 표시된 Spark 3 샘플은 Spark 버전 3.2.1 및 해당 Cassandra Spark 커넥터 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0으로 테스트되었습니다. Spark 및/또는 Cassandra 커넥터의 최신 버전은 예상대로 작동하지 않을 수 있습니다.

샘플 데이터 생성기

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

개수(Count) 연산

RDD API

sc.cassandraTable("books_ks", "books").count

출력:

count: Long = 5

데이터 프레임 API

데이터 프레임 수 계산은 현재 지원되지 않습니다. 아래 샘플은 임시 해결책으로 데이터 프레임을 메모리에 지속시킨 후 데이터 프레임 수 계산을 실행하는 방법을 보여줍니다.

"메모리 부족" 문제가 발생하지 않도록 다음 옵션 중에서 스토리지 옵션을 선택합니다.

  • MEMORY_ONLY: 기본 스토리지 옵션입니다. RDD를 JVM에 역직렬화된 Java 개체로 저장합니다. RDD가 메모리에 맞지 않으면 파티션이 캐시되지 않고 필요할 때마다 즉석에서 다시 계산됩니다.

  • MEMORY_AND_DISK: RDD를 JVM에 역직렬화된 Java 개체로 저장합니다. RDD가 메모리에 맞지 않으면 디스크에 맞지 않는 파티션을 저장하고 필요할 때마다 저장된 위치에서 읽어 들입니다.

  • MEMORY_ONLY_SER(Java/Scala): RDD를 직렬화된 Java 개체(파티션 당 1바이트 배열)로 저장합니다. 이 옵션은 역직렬화된 개체에 비해 공간 효율적이며, 특히 고속 직렬 변환기를 사용하는 경우에는 더 그렇습니다. 단, 읽기 작업에 CPU가 많이 사용됩니다.

  • MEMORY_AND_DISK_SER(Java/Scala): 이 스토리지 옵션은 MEMORY_ONLY_SER와 비슷합니다. 유일한 차이점은 필요할 때 디스크 메모리를 다시 계산하지 않고 디스크 메모리에 맞지 않는 파티션을 분산하는 것입니다.

  • DISK_ONLY: RDD 파티션을 디스크에만 저장합니다.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: 위의 수준과 같지만 두 개의 클러스터 노드에 있는 각 파티션을 복제합니다.

  • OFF_HEAP(실험적임): MEMORY_ONLY_SER과 유사하지만 오프-힙(off-heap) 메모리에 데이터를 저장하므로 오프-힙(off-heap) 메모리를 사용하도록 미리 설정해야 합니다.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

평균(Average) 연산

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

출력:

res24: Double = 16.016000175476073

데이터 프레임 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

출력:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

출력:

16.016000175476073

최솟값(Min) 연산

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

출력:

res31: Float = 11.33

데이터 프레임 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

출력:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

출력:

11.33

최댓값(Max) 연산

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

데이터 프레임 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

출력:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

출력:

22.45

합계(Sum) 연산

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

출력:

res46: Double = 80.08000087738037

데이터 프레임 API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

출력:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

출력:

80.08000087738037

상위 또는 유사한 연산

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

출력:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

데이터 프레임 API

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

출력:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

다음 단계