Odczytywanie danych z usługi Azure Cosmos DB dla tabel apache Cassandra przy użyciu platformy Spark
DOTYCZY: Kasandra
W tym artykule opisano sposób odczytywania danych przechowywanych w usłudze Azure Cosmos DB dla systemu Apache Cassandra z platformy Spark.
Interfejs API dla konfiguracji bazy danych Cassandra
Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Uwaga
Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor
zamiast connections_per_executor_max
łącznika Spark 3 (zobacz powyżej).
Ostrzeżenie
Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.
Interfejs API ramki danych
Odczytywanie tabeli przy użyciu polecenia session.read.format
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Odczytywanie tabeli przy użyciu elementu spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Odczytywanie określonych kolumn w tabeli
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Zastosuj filtry
Predykaty można wypchnąć do bazy danych, aby umożliwić lepsze zoptymalizowane zapytania platformy Spark. Predykat to warunek zapytania, który zwraca wartość true lub false, zazwyczaj znajduje się w klauzuli WHERE. Predykat filtruje dane w zapytaniu bazy danych, zmniejszając liczbę wpisów pobranych z bazy danych i poprawiając wydajność zapytań. Domyślnie interfejs API zestawu danych platformy Spark automatycznie wypycha prawidłowe klauzule WHERE do bazy danych.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Sekcja Cassandra Filters
planu fizycznego zawiera filtr wypychany.
RDD API
Odczytywanie tabeli
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Odczytywanie określonych kolumn w tabeli
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
Widoki SQL
Tworzenie widoku tymczasowego na podstawie ramki danych
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Uruchamianie zapytań względem widoku
select * from books_vw where book_pub_year > 1891
Następne kroki
Poniżej przedstawiono dodatkowe artykuły dotyczące pracy z usługą Azure Cosmos DB dla systemu Apache Cassandra z platformy Spark: