Se connecter à Azure Cosmos DB for Apache Cassandra à partir de Spark
S’APPLIQUE À : Cassandra
Cet article fait partie d’une série d’articles sur l’intégration d’Azure Cosmos DB for Apache Cassandra à partir de Spark. Ces articles abordent la connectivité, les opérations du langage de définition de données (DDL), les opérations du langage de manipulation de données (DML) et l’intégration avancée de l’Azure Cosmos DB for Apache Cassandra à partir de Spark.
Prérequis
Approvisionner un compte Azure Cosmos DB for Apache Cassandra.
Provisionner votre choix d’environnement Spark [Azure Databricks | Azure HDInsight-Spark | Autres].
Dépendances pour la connectivité
Connecteur Spark pour Cassandra : le connecteur Spark est utilisé pour se connecter à Azure Cosmos DB for Apache Cassandra. Identifiez et utilisez la version du connecteur située dans le dépôt central Maven qui est compatible avec les versions Spark et Scala de votre environnement Spark. Nous recommandons un environnement prenant en charge Spark 3.2.1 ou une version supérieure et le connecteur Spark accessible aux coordonnées Maven
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Si vous utilisez Spark 2.x, nous vous recommandons d’utiliser un environnement avec Spark version 2.4.5, avec le connecteur Spark aux coordonnées Mavencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Bibliothèque d’assistance Azure Cosmos DB pour l’API pour Cassandra : Si vous utilisez une version Spark 2.x, en plus du connecteur Spark, vous avez besoin d’une autre bibliothèque appelée azure-cosmos-cassandra-spark-helper avec les coordonnées Maven
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
d’Azure Cosmos DB afin de gérer la limitation de débit. Cette bibliothèque contient des classes personnalisées de fabrique de connexion et de stratégie de nouvelle tentative.La stratégie de nouvelles tentatives dans Azure Cosmos DB est configurée pour gérer les exceptions de code d’état HTTP 429 (« Requêtes trop nombreuses »). Azure Cosmos DB for Apache Cassandra traduit ces exceptions en erreurs surchargées sur le protocole natif Cassandra, et vous pouvez réessayer avec des interruptions. Comme Azure Cosmos DB utilise le modèle de débit provisionné, des exceptions de limitation du taux de requêtes se produisent quand les taux d’entrée/de sortie augmentent. La stratégie de nouvelles tentatives protège vos travaux Spark contre les pics de données qui dépassent momentanément le débit alloué pour votre conteneur. Si vous utilisez le connecteur Spark 3.x, l’implémentation de cette bibliothèque n’est pas nécessaire.
Notes
La stratégie de nouvelles tentatives peut protéger vos travaux Spark contre les pics momentanés uniquement. Si vous n’avez pas configuré suffisamment d’unités de requête nécessaires pour exécuter votre charge de travail, la stratégie de nouvelles tentatives n’est pas applicable et la classe de stratégie de nouvelles tentatives lève à nouveau l’exception.
Détails de connexion de compte Azure Cosmos DB : nom de compte, point de terminaison de compte et clé de votre API Azure pour Cassandra.
Optimisation de la configuration du débit du connecteur Spark
La section suivante répertorie tous les paramètres pertinents pour contrôler le débit à l’aide du connecteur Spark pour Cassandra. Afin d’optimiser les paramètres pour maximiser le débit des travaux Spark, les configurations spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
et spark.cassandra.input.reads_per_sec
doivent être correctement configurées afin d’éviter une limitation de bande passante et un backoff trop importants (qui peuvent à leur tour entraîner une baisse du débit).
La valeur optimale de ces configurations dépend de quatre facteurs :
- La quantité de débit (unités de requête) configurée pour la table dans laquelle les données sont ingérées.
- Le nombre de Workers dans votre cluster Spark.
- Le nombre d’exécuteurs configurés pour votre travail Spark (qui peut être contrôlé à l’aide de
spark.cassandra.connection.connections_per_executor_max
ouspark.cassandra.connection.remoteConnectionsPerExecutor
en fonction de la version de Spark). - La latence moyenne de chaque requête à Azure Cosmos DB, si vous êtes colocalisé dans le même centre de données. Supposons que cette valeur soit de 10 ms pour les écritures et de 3 ms pour les lectures.
Par exemple, si nous avons cinq Workers, la valeur spark.cassandra.output.concurrent.writes
= 1 et la valeur spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, alors nous avons cinq Workers qui écrivent simultanément dans la table, chacun avec un thread. S’il faut 10 ms pour effectuer une seule écriture, nous pouvons envoyer 100 requêtes (1000 millisecondes divisées par 10) par seconde et par thread. Avec cinq Workers, cela représenterait 500 écritures par seconde. À un coût moyen de cinq unités de requête (RU) par écriture, la table cible aurait besoin au minimum de 2500 unités de requête provisionnées (5 RU x 500 écritures par seconde).
L’augmentation du nombre d’exécuteurs peut accroître le nombre de threads dans un travail donné, ce qui peut à son tour augmenter le débit. Toutefois, l’impact exact de cette augmentation peut varier en fonction du travail, alors que le contrôle du débit par le nombre de Workers est plus déterministe. Vous pouvez également déterminer le coût exact d’une requête donnée en la profilant pour obtenir les frais d’unités de requête (RU). Cela vous aidera à être plus précis lors de l’approvisionnement du débit de votre table ou votre espace de clés. Consultez notre article ici pour comprendre comment obtenir des frais d’unités de requête par requête.
Mise à l’échelle du débit dans la base de données
Le connecteur Cassandra Spark sature efficacement le débit dans Azure Cosmos DB. Ainsi, même si de nouvelles tentatives sont efficaces, vous devez veiller à ce que le débit provisionné (RU) soit suffisant au niveau des tables ou des espaces de clés pour éviter des erreurs liées à la limitation du débit. Le paramètre minimal de 400 RU pour une table ou un espace de clés donné ne suffira pas. Même avec les paramètres de configuration du débit minimum, le connecteur Spark peut écrire à hauteur d’environ 6 000 unités de requête, voire plus.
Si le paramètre de RU requis pour le déplacement des données avec Spark est plus élevé que ce qui est nécessaire pour votre charge de travail à l’état stable, vous pouvez facilement effectuer un scale-up ou un scale-down du débit de manière systématique dans Azure Cosmos DB pour répondre aux besoins de votre charge de travail pendant une période donnée. Lisez notre article sur la mise à l’échelle élastique dans l’API pour Cassandra pour comprendre les différentes options de mise à l’échelle dynamique et par programmation.
Notes
L’aide ci-dessus suppose une distribution raisonnablement uniforme des données. Si les données sont fortement asymétriques (c’est-à-dire un nombre anormalement élevé de lectures/écritures de la même valeur de clé de partition), vous risquez de rencontrer des goulots d’étranglement, même si vous avez approvisionné un grand nombre d’unités de requête dans votre table. Les unités de requête sont réparties de manière égale entre les partitions physiques, et une forte asymétrie des données peut provoquer un goulot d’étranglement des requêtes adressées à une seule partition.
Paramètres de configuration du débit du connecteur Spark
Le tableau suivant répertorie les paramètres de configuration du débit spécifique à Azure Cosmos DB for Apache Cassandra fournis par le connecteur. Pour obtenir une liste détaillée de tous les paramètres de configuration, consultez la page Informations de référence sur la configuration du dépôt GitHub du connecteur Cassandra Spark.
Nom de la propriété | Valeur par défaut | Description |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Nombre de lignes par lot. Définissez ce paramètre sur 1. Ce paramètre est utilisé pour obtenir un débit plus élevé pour les charges de travail importantes. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Aucun | Nombre maximal de connexions par nœud et par exécuteur. 10*n équivaut à 10 connexions par nœud dans un cluster Cassandra de n nœuds. Par conséquent, si vous avez besoin de cinq connexions par nœud et par exécuteur pour un cluster Cassandra à cinq nœuds, vous devez définir cette configuration sur 25. Modifiez cette valeur en fonction du degré de parallélisme ou du nombre d’exécuteurs pour lesquels vos travaux Spark sont configurés. |
spark.cassandra.output.concurrent.writes | 100 | Définit le nombre d’écritures parallèles qui peuvent se produire par exécuteur. Étant donné que vous avez défini « batch.size.rows » sur 1, veillez à augmenter cette valeur en conséquence. Modifiez cette valeur selon le degré de parallélisme ou le débit que vous souhaitez obtenir pour votre charge de travail. |
spark.cassandra.concurrent.reads | 512 | Définit le nombre de lectures parallèles qui peuvent se produire par exécuteur. Modifiez cette valeur selon le degré de parallélisme ou le débit que vous souhaitez obtenir pour votre charge de travail. |
spark.cassandra.output.throughput_mb_per_sec | Aucun | Définit le débit d’écriture total par exécuteur. Ce paramètre peut être utilisé comme limite supérieure pour le débit de vos travaux Spark et le baser sur le débit provisionné de votre conteneur Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | Aucun | Définit le débit de lecture total par exécuteur. Ce paramètre peut être utilisé comme limite supérieure pour le débit de vos travaux Spark et le baser sur le débit provisionné de votre conteneur Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1 000 | Définit le nombre de lots par tâche Spark qui peuvent être stockés en mémoire avant d’être envoyés à l’API pour Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Définit la période pendant laquelle les connexions inutilisées sont disponibles. |
Ajustez le débit et le degré de parallélisme de ces paramètres en fonction de la charge de travail que vous prévoyez pour vos travaux Spark et du débit que vous avez provisionné pour votre compte Azure Cosmos DB.
Se connecter à Azure Cosmos DB for Apache Cassandra à partir de Spark
cqlsh
Les commandes suivantes décrivent en détail comment vous connecter à Azure Cosmos DB for Apache Cassandra à partir de cqlsh. Cela est utile pour la validation quand vous exécutez les exemples dans Spark.
À partir d’Unix/Linux/Mac :
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
L’article ci-dessous aborde le provisionnement de clusters Azure Databricks, la configuration de clusters pour la connexion à Azure Cosmos DB for Apache Cassandra et plusieurs exemples de notebooks qui traitent des opérations DDL et des opérations DML, entre autres.
Utiliser Azure Cosmos DB for Apache Cassandra à partir d’Azure Databricks
2. Azure HDInsight Spark
L’article ci-dessous aborde le service HDinsight-Spark, le provisionnement, la configuration de clusters pour la connexion à Azure Cosmos DB for Apache Cassandra et plusieurs exemples de notebooks qui traitent des opérations DDL et des opérations DML, entre autres.
Utiliser Azure Cosmos DB for Apache Cassandra à partir d’Azure HDInsight-Spark
3. Généralités sur l’environnement Spark
Alors que les sections ci-dessus étaient spécifiques aux services PaaS basés sur Azure Spark, cette section traite de n’importe quel environnement Spark général. Les dépendances du connecteur, les importations et la configuration de session Spark sont décrites en détail ci-dessous. La section « Étapes suivantes » présente des exemples de code pour les opérations DDL et les opérations DML, entre autres.
Dépendances du connecteur :
- Ajouter les coordonnées Maven pour obtenir le connecteur Cassandra pour Spark
- Ajouter les coordonnées Maven pour la bibliothèque d’assistance Azure Cosmos DB pour l’API pour Cassandra
Importations :
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Configuration de session Spark :
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Étapes suivantes
Les articles suivants illustrent l’intégration Spark à Azure Cosmos DB for Apache Cassandra.