DDL-Vorgänge in Azure Cosmos DB for Apache Cassandra von Spark
GILT FÜR: Cassandra
In diesem Artikel werden DDL-Vorgänge im Keyspace und in Tabellen für Azure Cosmos DB for Apache Cassandra von Spark beschrieben.
Spark-Kontext
Der Connector für die API für Cassandra erfordert, dass die Details der Cassandra-Verbindung als Teil des Spark-Kontexts initialisiert werden. Beim Starten eines Notebooks ist der Spark-Kontext bereits initialisiert, und es ist nicht ratsam, ihn zu beenden und erneut zu initialisieren. Eine Lösung ist, die Konfiguration der API-für-Cassandra-Instanz auf Clusterebene hinzuzufügen, in der Cluster-Spark-Konfiguration. Dies ist eine einmalige Aktivität pro Cluster. Fügen Sie der Spark-Konfiguration den folgenden Code als ein durch Leerzeichen getrenntes Schlüssel-Wert-Paar hinzu:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Konfiguration im Zusammenhang mit der API für Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Hinweis
Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor
anstelle von connections_per_executor_max
für den Spark 3-Connector verwenden (siehe oben).
Warnung
Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Keyspace-DDL-Vorgänge
Erstellen eines Keyspace
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Daraufhin sollte der Keyspace, den Sie zuvor erstellt haben, angezeigt werden.
DESCRIBE keyspaces;
Verwerfen eines Keyspace
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Überprüfen in cqlsh
DESCRIBE keyspaces;
Tabellen-DDL-Vorgänge
Überlegungen:
- Durchsatz kann mithilfe der Anweisung „create table“ auf Tabellenebene zugewiesen werden.
- In einem Partitionsschlüssel können 20 GB Daten gespeichert werden.
- In einem Datensatz können bis zu 2 MB Daten gespeichert werden.
- In einem Partitionsschlüsselbereich können mehrere Partitionsschlüssel gespeichert werden.
Erstellen einer Tabelle
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Daraufhin sollte die Tabelle „books“ angezeigt werden:
USE books_ks;
DESCRIBE books;
Der bereitgestellte Durchsatz und die Standard-TTL-Werte werden in der Ausgabe des vorherigen Befehls nicht angezeigt. Sie können diese Werte im Portal abrufen.
ALTER TABLE
Sie können die folgenden Werte mit dem Befehl ALTER TABLE ändern:
- Bereitgestellter Durchsatz
- Wert der Gültigkeitsdauer
Spaltenänderungen werden derzeit nicht unterstützt.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Löschen einer Tabelle
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Überprüfen in cqlsh
Führen Sie den folgenden Befehl in cqlsh aus. Sie sollten sehen, dass die Tabelle „books“ nicht mehr verfügbar ist:
USE books_ks;
DESCRIBE tables;
Nächste Schritte
Fahren Sie nach dem Erstellen von Keyspace und Tabelle mit den folgenden Artikeln für CRUD-Vorgänge und Ähnliches fort: