Lectura de datos de tablas de Azure Cosmos DB for Apache Cassandra con Spark
SE APLICA A: Cassandra
En este artículo se describe cómo leer los datos almacenados en Azure Cosmos DB for Apache Cassandra desde Spark.
Configuración de la API para Cassandra
Establezca la configuración de Spark siguiente en el clúster del cuaderno. Es una actividad única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Si usa Spark 3.x, no es necesario instalar el asistente de Azure Cosmos DB ni el generador de conexiones. También debe usar remoteConnectionsPerExecutor
en lugar de connections_per_executor_max
para el conector de Spark 3 (consulte más arriba).
Advertencia
Los ejemplos de Spark 3 que se muestran en este artículo se han probado con la versión 3.2.1 de Spark y el conector de Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 correspondiente. Es posible que las versiones posteriores de Spark o del conector de Cassandra no funcionen según lo previsto.
Dataframe API
Lectura de la tabla con el comando session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Lectura de la tabla con spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Lectura de columnas específicas de la tabla
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Aplicación de filtros
Puede insertar predicados en la base de datos para permitir consultas Spark mejor optimizadas. Un predicado es una condición de una consulta que devuelve true o false, y que normalmente se encuentra en la cláusula WHERE. Un predicado inserta filtros en los datos de la consulta de base de datos, lo que reduce el número de entradas recuperadas de la base de datos y mejora el rendimiento de las consultas. De forma predeterminada, la API Dataset de Spark insertará automáticamente cláusulas WHERE válidas en la base de datos.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
La sección Cassandra Filters
del plan físico incluye el filtro insertado.
RDD API
Lectura de tabla
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Lectura de columnas específicas de la tabla
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Vistas SQL
Creación de una vista temporal desde un dataframe
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Ejecución de consultas en la vista
select * from books_vw where book_pub_year > 1891
Pasos siguientes
Los siguientes son artículos adicionales sobre cómo trabajar con Azure Cosmos DB for Apache Cassandra desde Spark:
- Upsert operations (Operaciones de upsert)
- Delete operations (Operaciones de eliminación)
- Aggregation operations (Operaciones de agregación)
- Table copy operations (Operaciones de copia en tabla)