Operacje DDL w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark
DOTYCZY: Kasandra
W tym artykule szczegółowo przestrzeń kluczy i operacje DDL tabeli dla usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark.
Kontekst platformy Spark
Łącznik dla interfejsu API dla bazy danych Cassandra wymaga zainicjowania szczegółów połączenia Cassandra w ramach kontekstu platformy Spark. Po uruchomieniu notesu kontekst platformy Spark jest już inicjowany i nie zaleca się jego zatrzymywania i ponownego inicjowania. Jednym z rozwiązań jest dodanie interfejsu API dla konfiguracji wystąpienia bazy danych Cassandra na poziomie klastra w konfiguracji platformy Spark klastra. Jest to jednorazowe działanie na klaster. Dodaj następujący kod do konfiguracji platformy Spark jako parę wartości klucza rozdzielanego spacją:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Interfejs API dla konfiguracji związanej z usługą Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Uwaga
Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor
zamiast connections_per_executor_max
łącznika Spark 3 (zobacz powyżej).
Ostrzeżenie
Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.
Operacje DDL przestrzeni kluczy
Tworzenie przestrzeni kluczy
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Weryfikowanie w języku cqlsh
Uruchom następujące polecenie w języku cqlsh i powinien zostać wyświetlony utworzony wcześniej obszar kluczy.
DESCRIBE keyspaces;
Usuwanie przestrzeni kluczy
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Weryfikowanie w języku cqlsh
DESCRIBE keyspaces;
Operacje DDL tabeli
Ważne kwestie:
- Przepływność można przypisać na poziomie tabeli przy użyciu instrukcji create table.
- Jeden klucz partycji może przechowywać 20 GB danych.
- Jeden rekord może przechowywać maksymalnie 2 MB danych.
- Jeden zakres kluczy partycji może przechowywać wiele kluczy partycji.
Utwórz tabelę
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Weryfikowanie w języku cqlsh
Uruchom następujące polecenie w narzędziu cqlsh i powinna zostać wyświetlona tabela o nazwie "books:
USE books_ks;
DESCRIBE books;
Aprowizowana przepływność i domyślne wartości czasu wygaśnięcia nie są wyświetlane w danych wyjściowych poprzedniego polecenia. Te wartości można pobrać z portalu.
Zmienianie tabeli
Następujące wartości można zmienić za pomocą polecenia alter table:
- Aprowizowana przepływność
- time-to-live value
Zmiany kolumn nie są obecnie obsługiwane.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Upuść tabelę
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Weryfikowanie w języku cqlsh
Uruchom następujące polecenie w narzędziu cqlsh i upewnij się, że tabela "books" nie jest już dostępna:
USE books_ks;
DESCRIBE tables;
Następne kroki
Po utworzeniu przestrzeni kluczy i tabeli przejdź do następujących artykułów dotyczących operacji CRUD i nie tylko: