Leggere i dati dalle tabelle di Azure Cosmos DB for Apache Cassandra usando Spark
SI APPLICA A: Cassandra
Questo articolo descrive come leggere i dati archiviati in Azure Cosmos DB for Apache Cassandra da Spark.
Configurazione dell'API per Cassandra
Impostare la configurazione Spark seguente nel cluster del notebook. Si tratta di un'attività una tantum.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Se si usa Spark 3.x, non è necessario installare l'helper e la factory di connessione di Azure Cosmos DB. È anche consigliabile usare remoteConnectionsPerExecutor
anziché connections_per_executor_max
per il connettore Spark 3 (vedere sopra).
Avviso
I campioni di Spark 3 illustrati in questo articolo sono stati testati con Spark versione 3.2.1 e il connettore Cassandra Spark corrispondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Le versioni successive di Spark e/o del connettore Cassandra potrebbero non funzionare come previsto.
API dataframe
Leggere le tabelle usando il comando session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Leggere le tabelle usando spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Leggere colonne specifiche in una tabella
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Applica filtri
È possibile eseguire il push dei predicati nel database per consentire query Spark ottimizzate migliori. Un predicato è una condizione in una query che restituisce true o false, che si trova generalmente nella clausola WHERE. Il push di un predicato filtra i dati nella query di database, riducendo il numero di voci recuperate dal database e migliorando le prestazioni delle query. Per impostazione predefinita, l'API del set di dati Spark eseguirà automaticamente il push delle clausole WHERE valide nel database.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
La sezione Cassandra Filters
del piano fisico include il filtro sottoposto a push verso il basso.
API RDD
Leggere una tabella
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Leggere colonne specifiche in una tabella
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Viste SQL
Creare una vista temporanea da un dataframe
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Eseguire query sulla vista
select * from books_vw where book_pub_year > 1891
Passaggi successivi
Di seguito sono indicati altri articoli sull'uso di Azure Cosmos DB for Apache Cassandra da Spark: