Udostępnij za pośrednictwem


Operacje agregowania w usłudze Azure Cosmos DB dla tabel apache Cassandra z platformy Spark

DOTYCZY: Kasandra

W tym artykule opisano podstawowe operacje agregacji względem tabel usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark.

Uwaga

Filtrowanie po stronie serwera i agregacja po stronie serwera nie jest obecnie obsługiwana w usłudze Azure Cosmos DB dla systemu Apache Cassandra.

Interfejs API dla konfiguracji bazy danych Cassandra

Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
 spark.cassandra.connection.port  10350
 spark.cassandra.connection.ssl.enabled  true
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000
 spark.cassandra.concurrent.reads  512
 spark.cassandra.output.batch.grouping.buffer.size  1000
 spark.cassandra.connection.keep_alive_ms  600000000

Uwaga

Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor zamiast connections_per_executor_max łącznika Spark 3 (zobacz powyżej).

Ostrzeżenie

Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.

Przykładowy generator danych

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Operacja licznika

RDD API

sc.cassandraTable("books_ks", "books").count

Wyjście:

count: Long = 5

Interfejs API ramki danych

Liczba ramek danych nie jest obecnie obsługiwana. W poniższym przykładzie pokazano, jak wykonać liczbę ramek danych po utrwalonej ramce danych w pamięci jako obejście problemu.

Wybierz opcję magazynu z następujących dostępnych opcji, aby uniknąć problemów z brakiem pamięci:

  • MEMORY_ONLY: jest to domyślna opcja magazynu. Przechowuje rdD jako deserializowane obiekty Java w maszynie JVM. Jeśli RDD nie mieści się w pamięci, niektóre partycje nie będą buforowane i są ponownie obliczane na bieżąco za każdym razem, gdy są potrzebne.

  • MEMORY_AND_DISK: przechowuje RDD jako deserializowane obiekty Java w maszynie JVM. Jeśli RDD nie mieści się w pamięci, zapisz partycje, które nie mieszczą się na dysku i zawsze, gdy jest to wymagane, odczytaj je z lokalizacji, w której są przechowywane.

  • MEMORY_ONLY_SER (Java/Scala): przechowuje RDD jako serializowane obiekty Java — 1-bajtową tablicę na partycję. Ta opcja jest wydajna w porównaniu z deserializowanymi obiektami, zwłaszcza w przypadku korzystania z szybkiego serializatora, ale intensywniejszego odczytu procesora CPU.

  • MEMORY_AND_DISK_SER (Java/Scala): Ta opcja magazynu przypomina MEMORY_ONLY_SER, jedyną różnicą jest to, że rozla partycje, które nie mieszczą się w pamięci dysku, zamiast ponownie je skompilować, gdy są potrzebne.

  • DISK_ONLY: przechowuje tylko partycje RDD na dysku.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: takie same jak powyższe poziomy, ale replikuje każdą partycję w dwóch węzłach klastra.

  • OFF_HEAP (eksperymentalna): podobnie jak MEMORY_ONLY_SER, ale przechowuje dane w pamięci poza stertą i wymaga włączenia pamięci poza stertą.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Średnia operacja

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Wyjście:

res24: Double = 16.016000175476073

Interfejs API ramki danych

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Wyjście:

+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |

SQL

select avg(book_price) from books_vw;

Wyjście:

16.016000175476073

Minimalna operacja

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Wyjście:

res31: Float = 11.33

Interfejs API ramki danych

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Wyjście:

+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |

SQL

%sql
select avg(book_price) from books_vw;

Wyjście:

11.33

Maksymalna operacja

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Interfejs API ramki danych

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Wyjście:

+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |

SQL

%sql
select max(book_price) from books_vw;

Wyjście:

22.45

Operacja sumowania

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Wyjście:

res46: Double = 80.08000087738037

Interfejs API ramki danych

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Wyjście:

+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |

SQL

select sum(book_price) from books_vw;

Wyjście:

80.08000087738037

Operacja top lub porównywalna

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Wyjście:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Interfejs API ramki danych

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Wyjście:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Następny krok