Aggregera åtgärder i Azure Cosmos DB för Apache Cassandra-tabeller från Spark
GÄLLER FÖR: Kassandra
Den här artikeln beskriver grundläggande aggregeringsåtgärder mot Azure Cosmos DB för Apache Cassandra-tabeller från Spark.
Kommentar
Filtrering på serversidan och sammansättning på serversidan stöds för närvarande inte i Azure Cosmos DB för Apache Cassandra.
API för Cassandra-konfiguration
Ställ in spark-konfigurationen nedan i notebook-klustret. Det är en engångsaktivitet.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Kommentar
Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor
i stället connections_per_executor_max
för för Spark 3-anslutningsappen (se ovan).
Varning
Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.
Exempeldatagenerator
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Antal åtgärder
RDD-API
sc.cassandraTable("books_ks", "books").count
Utdata:
count: Long = 5
Dataframe-API
Antal mot dataramar stöds för närvarande inte. Exemplet nedan visar hur du kör ett antal dataramar efter att dataramen har sparats i minnet som en lösning.
Välj ett lagringsalternativ bland följande tillgängliga alternativ för att undvika problem med "slut på minne":
MEMORY_ONLY: Det är standardalternativet för lagring. Lagrar RDD som deserialiserade Java-objekt i JVM. Om RDD:t inte får plats i minnet cachelagras inte vissa partitioner och de räknas om i farten varje gång de behövs.
MEMORY_AND_DISK: Lagrar RDD som deserialiserade Java-objekt i JVM. Om RDD:t inte får plats i minnet lagrar du de partitioner som inte får plats på disken och läser dem när det behövs från den plats där de lagras.
MEMORY_ONLY_SER (Java/Scala): Lagrar RDD som serialiserade Java-objekt – 1 byte matris per partition. Det här alternativet är utrymmeseffektivt jämfört med deserialiserade objekt, särskilt när du använder en snabb serialiserare, men mer CPU-intensiv att läsa.
MEMORY_AND_DISK_SER (Java/Scala): Det här lagringsalternativet är som MEMORY_ONLY_SER, den enda skillnaden är att det spiller partitioner som inte får plats i diskminnet i stället för att omberäkna dem när de behövs.
DISK_ONLY: Lagrar endast RDD-partitionerna på disken.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Samma som nivåerna ovan, men replikerar varje partition på två klusternoder.
OFF_HEAP (experimentell): Liknar MEMORY_ONLY_SER, men lagrar data i minnet utanför heapen, och det kräver att off-heap-minne aktiveras i förväg.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Genomsnittlig åtgärd
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Utdata:
res24: Double = 16.016000175476073
Dataframe-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Utdata:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Utdata:
16.016000175476073
Minsta åtgärd
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Utdata:
res31: Float = 11.33
Dataframe-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Utdata:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Utdata:
11.33
Maxåtgärd
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Dataframe-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Utdata:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Utdata:
22.45
Summaåtgärd
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Utdata:
res46: Double = 80.08000087738037
Dataframe-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Utdata:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Utdata:
80.08000087738037
Topp- eller jämförbar åtgärd
RDD-API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Utdata:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Dataframe-API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Utdata:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;