Lire les données de tables Azure Cosmos DB for Apache Cassandra à l’aide de Spark
S’APPLIQUE À : Cassandra
Cet article explique comment lire des données stockées dans Azure Cosmos DB for Apache Cassandra à partir de Spark.
API pour la configuration Cassandra
Définissez la configuration spark ci-dessous dans votre cluster de notebooks. Cette activité s’effectue une seule fois.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Notes
Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor
plutôt que connections_per_executor_max
pour le connecteur Spark 3 (cf. ci-dessus).
Avertissement
Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.
API Dataframe
Lire une table avec la commande session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Lire une table avec la commande spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Lire des colonnes spécifiques d’une table
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Appliquer des filtres
Vous pouvez effectuer un pushdown de prédicats vers la base de données pour optimiser encore les requêtes Spark. Un prédicat est une condition sur une requête, généralement dans la clause WHERE, qui retourne true ou false. Un pushdown de prédicat filtre les données dans la requête de base de données, réduisant ainsi le nombre d’entrées récupérées à partir de la base de données et améliorant les performances des requêtes. Par défaut, l’API Spark DataSet effectue automatiquement un pushdown des clauses WHERE valides vers la base de données.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
La section Cassandra Filters
du plan physique comprend le filtre pushed down.
API pour le jeu de donnée distribué résilient
Lire une table
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Lire des colonnes spécifiques d’une table
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Vues SQL
Créer une vue temporaire à partir d’un dataframe
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Exécuter des requêtes sur la vue
select * from books_vw where book_pub_year > 1891
Étapes suivantes
Voici des articles supplémentaires sur l’utilisation d’Azure Cosmos DB for Apache Cassandra à partir de Spark :