Connettersi ad Azure Cosmos DB per Apache Cassandra da Spark
SI APPLICA A: Cassandra
Questo articolo è uno di una serie di articoli sull'integrazione di Azure Cosmos DB per Apache Cassandra da Spark. Gli articoli trattano la connettività, le operazioni DDL (Data Definition Language), le operazioni DML (Data Manipulation Language) e l'integrazione avanzata di Azure Cosmos DB per Apache Cassandra da Spark.
Prerequisiti
Effettuare il provisioning di un account Azure Cosmos DB for Apache Cassandra.
Effettuare il provisioning dell'ambiente Spark di propria scelta [Azure Databricks | Azure HDInsight-Spark | Altri].
Dipendenze per la connettività
Connettore Spark per Cassandra: il connettore Spark viene usato per connettersi ad Azure Cosmos DB per Apache Cassandra. Identificare e usare la versione del connettore nel repository Maven compatibile con le versioni di Spark e Scala dell'ambiente Spark. È consigliabile un ambiente che supporti Spark 3.2.1 o versione successiva e il connettore Spark disponibile nelle coordinate maven
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Se si usa Spark 2.x, è consigliabile un ambiente con Spark versione 2.4.5, usando il connettore Spark alle coordinate mavencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Libreria helper di Azure Cosmos DB per l'API per Cassandra: se si usa una versione Spark 2.x, oltre al connettore Spark, è necessaria un'altra libreria denominata azure-cosmos-cassandra-spark-helper con coordinate maven
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
di Azure Cosmos DB per gestire la limitazione della frequenza. Questa libreria contiene una classe factory di connessione e una classe per i criteri di ripetizione, entrambe personalizzate.I criteri di ripetizione in Azure Cosmos DB sono configurati per gestire le eccezioni con codice stato HTTP 429 ("La frequenza delle richieste è troppo elevata"). Azure Cosmos DB per Apache Cassandra converte queste eccezioni in errori di overload per il protocollo nativo Cassandra ed è possibile riprovare con backoff. Dato che Azure Cosmos DB usa il modello di velocità effettiva con provisioning, quando il traffico in ingresso/uscita aumenta si verificano eccezioni di limitazione della frequenza delle richieste. I criteri di ripetizione proteggono i processi Spark da picchi di dati che superano temporaneamente la velocità effettiva allocata per il contenitore. Se si usa il connettore Spark 3.x, l'implementazione di questa libreria non è necessaria.
Nota
I criteri di ripetizione possono proteggere i processi di Spark solo da picchi momentanei. Se non è stato configurato un numero sufficiente di UR per eseguire il carico di lavoro, i criteri di ripetizione non sono applicabili e la classe dei criteri di ripetizione genera nuovamente l'eccezione.
Dettagli della connessione all'account Azure Cosmos DB: nome dell'account Azure per Cassandra, endpoint dell'account e chiave.
Ottimizzazione della configurazione della velocità effettiva del connettore Spark
Nella sezione successiva sono elencati tutti i parametri pertinenti per controllare la velocità effettiva usando il connettore Spark per Cassandra. Per ottimizzare i parametri al fine di massimizzare la velocità effettiva per i processi Spark, le configurazioni spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
e spark.cassandra.input.reads_per_sec
devono essere configurate correttamente, per evitare troppe limitazioni e back-off (che a loro volta possono portare a una velocità effettiva inferiore).
Il valore ottimale di queste configurazioni dipende da quattro fattori:
- Quantità di velocità effettiva (unità richiesta) configurata per la tabella in cui vengono inseriti i dati.
- Numero di ruoli di lavoro nel cluster Spark.
- Numero di executor configurati per il processo Spark (che può essere controllato usando
spark.cassandra.connection.connections_per_executor_max
ospark.cassandra.connection.remoteConnectionsPerExecutor
a seconda della versione di Spark) - Latenza media di ogni richiesta ad Azure Cosmos DB, se si è collocati nello stesso Data Center. Si supponga che questo valore sia 10 ms per le operazioni di scrittura e 3 ms per le letture.
Ad esempio, se sono presenti cinque ruoli di lavoro e un valore spark.cassandra.output.concurrent.writes
= 1 e un valore = spark.cassandra.connection.remoteConnectionsPerExecutor
1, sono presenti cinque ruoli di lavoro che scrivono simultaneamente nella tabella, ognuno con un thread. Se sono necessari 10 ms per eseguire una singola scrittura, è possibile inviare 100 richieste (1000 millisecondi diviso per 10) al secondo, per thread. Con cinque ruoli di lavoro, si tratta di 500 scritture al secondo. A un costo medio di cinque unità richiesta (UR) per scrittura, la tabella di destinazione richiederebbe un provisioning minimo di 2500 unità richiesta (5 UR x 500 scritture al secondo).
L'aumento del numero di executor può aumentare il numero di thread in un determinato processo, che a sua volta può aumentare la velocità effettiva. Tuttavia, l'impatto esatto che questo può avere può essere variabile a seconda del processo, mentre il controllo della velocità effettiva con il numero di ruoli di lavoro è più deterministico. È anche possibile determinare il costo esatto di una determinata richiesta profilandolo per ottenere l'addebito di unità richiesta (UR). Ciò consente di essere più accurati durante il provisioning della velocità effettiva per la tabella o lo spazio chiavi. Vedere l'articolo qui per informazioni su come ottenere gli addebiti per unità richiesta a livello di richiesta.
Ridimensionamento della velocità effettiva nel database
Il connettore Cassandra Spark saturerà la velocità effettiva in Azure Cosmos DB in modo efficiente. Di conseguenza, anche in presenza di tentativi efficaci, è necessario assicurarsi di disporre di una velocità effettiva sufficiente (UR) di cui è stato effettuato il provisioning a livello di tabella o di keyspace per evitare errori correlati alla limitazione della frequenza. L'impostazione minima di 400 UR in una determinata tabella o keyspace non sarà sufficiente. Anche con impostazioni minime di configurazione della velocità effettiva, il connettore Spark può scrivere a una velocità corrispondente a circa 6000 unità richiesta o più.
Se l'impostazione UR necessaria per lo spostamento dei dati con Spark è superiore a quella necessaria per il carico di lavoro con stato stabile, è possibile aumentare e ridurre sistematicamente la velocità effettiva in Azure Cosmos DB per soddisfare le esigenze del carico di lavoro per un determinato periodo di tempo. Leggere l'articolo su scalabilità elastica nell'API per Cassandra per comprendere le diverse opzioni per il ridimensionamento a livello di codice e in modo dinamico.
Nota
Le linee guida precedenti presuppongono una distribuzione ragionevolmente uniforme dei dati. Se si dispone di un'asimmetria significativa nei dati, ovvero un numero eccessivo di letture/scritture nello stesso valore della chiave di partizione, è comunque possibile che si verifichino dei colli di bottiglia, anche se nella tabella è stato effettuato il provisioning di un numero elevato di unità richiesta. Le unità richiesta sono divise equamente tra le partizioni fisiche, e una pesante asimmetria dei dati può causare un collo di bottiglia delle richieste a una singola partizione.
Parametri di configurazione della velocità effettiva del connettore Spark
La tabella seguente elenca i parametri di configurazione della velocità effettiva specifici di Azure Cosmos DB per Apache Cassandra forniti dal connettore. Per un elenco dettagliato di tutti i parametri di configurazione, vedere la pagina di riferimento per la configurazione del repository GitHub del connettore Cassandra Spark.
Nome proprietà | Valore predefinito: | Descrizione |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Numero di righe per ogni singolo batch. Impostare questo parametro su 1. Questo parametro viene usato per ottenere una velocità effettiva maggiore per carichi di lavoro pesanti. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | None | Numero massimo di connessioni per ogni nodo per ogni executor. 10*n equivale a 10 connessioni per nodo in un cluster Cassandra con n-nodi. Pertanto, se sono necessarie cinque connessioni per nodo per ogni executor per un cluster Cassandra a cinque nodi, è necessario impostare questa configurazione su 25. Modificare questo valore in base al grado di parallelismo o al numero di executor per cui sono configurati i processi Spark. |
spark.cassandra.output.concurrent.writes | 100 | Definisce il numero di scritture parallele che possono verificarsi per ogni executor. Dato che "batch.size.rows" è impostato su 1, assicurarsi di aumentare questo valore di conseguenza. Modificare questo valore in base al grado di parallelismo o alla velocità effettiva che si vuole ottenere per il carico di lavoro. |
spark.cassandra.concurrent.reads | 512 | Definisce il numero di letture parallele che possono verificarsi per ogni executor. Modificare questo valore in base al grado di parallelismo o alla velocità effettiva che si vuole ottenere per il carico di lavoro. |
spark.cassandra.output.throughput_mb_per_sec | None | Definisce la velocità effettiva di scrittura totale per ogni executor. Questo parametro può essere usato come limite superiore per la velocità effettiva dei processi Spark, basandolo sulla velocità effettiva con provisioning del contenitore Di Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | None | Definisce la velocità effettiva di lettura totale per ogni executor. Questo parametro può essere usato come limite superiore per la velocità effettiva dei processi Spark, basandolo sulla velocità effettiva con provisioning del contenitore di Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definisce il numero di batch per ogni singola attività Spark che possono essere archiviati in memoria prima dell'invio all'API per Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Definisce il periodo di tempo fino a quando sono disponibili connessioni inutilizzate. |
Regolare la velocità effettiva e il grado di parallelismo di questi parametri in base al carico di lavoro previsto per i processi Spark e alla velocità effettiva di cui è stato effettuato il provisioning per l'account Azure Cosmos DB.
Connettersi ad Azure Cosmos DB per Apache Cassandra da Spark
cqlsh
I comandi seguenti illustrano in dettaglio come connettersi ad Azure Cosmos DB per Apache Cassandra da cqlsh. Queste informazioni sono utili per la convalida durante l'esecuzione degli esempi in Spark.
Da Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
L'articolo seguente descrive il provisioning di cluster Azure Databricks, la configurazione del cluster per la connessione ad Azure Cosmos DB per Apache Cassandra e vari notebook di esempio che illustrano le operazioni DDL, le operazioni DML e altro ancora.
Usare Azure Cosmos DB per Apache Cassandra da Azure Databricks
2. Azure HDInsight-Spark
L'articolo seguente descrive il servizio HDinsight-Spark, il provisioning, la configurazione del cluster per la connessione ad Azure Cosmos DB per Apache Cassandra e vari notebook di esempio che illustrano operazioni DDL, operazioni DML e altro ancora.
Usare Azure Cosmos DB per Apache Cassandra da Azure HDInsight-Spark
3. Ambiente Spark in generale
Mentre le sezioni precedenti sono specifiche per servizi PaaS basati su Spark di Azure, questa sezione è dedicata agli ambienti Spark generali. Di seguito sono indicate in dettaglio le dipendenze del connettore, le importazioni e la configurazione della sessione Spark. La sezione "Passaggi successivi" include collegamenti a esempi di codice per operazioni DDL, operazioni DML e altro.
Dipendenze del connettore:
- Aggiungere le coordinate di maven per ottenere il connettore Cassandra per Spark
- Aggiungere le coordinate maven per la libreria helper di Azure Cosmos DB per l'API per Cassandra
Importazioni:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Configurazione della sessione Spark:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Passaggi successivi
Gli articoli seguenti illustrano l'integrazione di Spark con Azure Cosmos DB per Apache Cassandra.