Udostępnij za pośrednictwem


Operacje DDL w usłudze Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark

DOTYCZY: Kasandra

W tym artykule szczegółowo przestrzeń kluczy i operacje DDL tabeli dla usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark.

Kontekst platformy Spark

Łącznik dla interfejsu API dla bazy danych Cassandra wymaga zainicjowania szczegółów połączenia Cassandra w ramach kontekstu platformy Spark. Po uruchomieniu notesu kontekst platformy Spark jest już inicjowany i nie zaleca się jego zatrzymywania i ponownego inicjowania. Jednym z rozwiązań jest dodanie interfejsu API dla konfiguracji wystąpienia bazy danych Cassandra na poziomie klastra w konfiguracji platformy Spark klastra. Jest to jednorazowe działanie na klaster. Dodaj następujący kod do konfiguracji platformy Spark jako parę wartości klucza rozdzielanego spacją:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Uwaga

Jeśli używasz platformy Spark 3.x, nie musisz instalować pomocnika i fabryki połączeń usługi Azure Cosmos DB. Należy również użyć remoteConnectionsPerExecutor zamiast connections_per_executor_max łącznika Spark 3 (zobacz powyżej).

Ostrzeżenie

Przykłady platformy Spark 3 pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.2.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.

Operacje DDL przestrzeni kluczy

Tworzenie przestrzeni kluczy

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Weryfikowanie w języku cqlsh

Uruchom następujące polecenie w języku cqlsh i powinien zostać wyświetlony utworzony wcześniej obszar kluczy.

DESCRIBE keyspaces;

Usuwanie przestrzeni kluczy

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Weryfikowanie w języku cqlsh

DESCRIBE keyspaces;

Operacje DDL tabeli

Ważne kwestie:

  • Przepływność można przypisać na poziomie tabeli przy użyciu instrukcji create table.
  • Jeden klucz partycji może przechowywać 20 GB danych.
  • Jeden rekord może przechowywać maksymalnie 2 MB danych.
  • Jeden zakres kluczy partycji może przechowywać wiele kluczy partycji.

Utwórz tabelę

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Weryfikowanie w języku cqlsh

Uruchom następujące polecenie w narzędziu cqlsh i powinna zostać wyświetlona tabela o nazwie "books:

USE books_ks;
DESCRIBE books;

Aprowizowana przepływność i domyślne wartości czasu wygaśnięcia nie są wyświetlane w danych wyjściowych poprzedniego polecenia. Te wartości można pobrać z portalu.

Zmienianie tabeli

Następujące wartości można zmienić za pomocą polecenia alter table:

  • Aprowizowana przepływność
  • time-to-live value
    Zmiany kolumn nie są obecnie obsługiwane.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Upuść tabelę

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Weryfikowanie w języku cqlsh

Uruchom następujące polecenie w narzędziu cqlsh i upewnij się, że tabela "books" nie jest już dostępna:

USE books_ks;
DESCRIBE tables;

Następne kroki

Po utworzeniu przestrzeni kluczy i tabeli przejdź do następujących artykułów dotyczących operacji CRUD i nie tylko: