Udostępnij za pośrednictwem


Odczytywanie danych z usługi Azure Cosmos DB dla tabel apache Cassandra przy użyciu platformy Spark

DOTYCZY: Kasandra

W tym artykule opisano sposób odczytywania danych przechowywanych w usłudze Azure Cosmos DB dla systemu Apache Cassandra z platformy Spark.

Interfejs API dla konfiguracji bazy danych Cassandra

Ustaw poniższą konfigurację platformy Spark w klastrze notesów. Jest to jednorazowe działanie.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Uwaga

Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor zamiast connections_per_executor_max łącznika Spark 3 (zobacz powyżej).

Ostrzeżenie

Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.

Interfejs API ramki danych

Odczytywanie tabeli przy użyciu polecenia session.read.format

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Odczytywanie tabeli przy użyciu elementu spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Odczytywanie określonych kolumn w tabeli

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Zastosuj filtry

Predykaty można wypchnąć do bazy danych, aby umożliwić lepsze zoptymalizowane zapytania platformy Spark. Predykat to warunek zapytania, który zwraca wartość true lub false, zazwyczaj znajduje się w klauzuli WHERE. Predykat filtruje dane w zapytaniu bazy danych, zmniejszając liczbę wpisów pobranych z bazy danych i poprawiając wydajność zapytań. Domyślnie interfejs API zestawu danych platformy Spark automatycznie wypycha prawidłowe klauzule WHERE do bazy danych.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Sekcja Cassandra Filters planu fizycznego zawiera filtr wypychany.

partycje

RDD API

Odczytywanie tabeli

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Odczytywanie określonych kolumn w tabeli

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

Widoki SQL

Tworzenie widoku tymczasowego na podstawie ramki danych

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Uruchamianie zapytań względem widoku

select * from books_vw where book_pub_year > 1891

Następne kroki

Poniżej przedstawiono dodatkowe artykuły dotyczące pracy z usługą Azure Cosmos DB dla systemu Apache Cassandra z platformy Spark: