Accéder à Azure Cosmos DB for Apache Cassandra à partir de Spark sur YARN avec HDInsight
S’APPLIQUE À : Cassandra
Cet article explique comment accéder à Azure Cosmos DB for Apache Cassandra à partir de Spark sur YARN avec HDInsight-Spark au moyen de spark-shell
. HDInsight est la plateforme PaaS Hortonworks Hadoop de Microsoft sur Azure. Elle utilise le stockage d’objets pour HDFS et se décline en plusieurs versions, dont Spark. Bien que cet article fasse référence à HDInsight-Spark, il s’applique à toutes les distributions Hadoop.
Prérequis
Avant de commencer, passez en revue les principes fondamentaux de la connexion à Azure Cosmos DB for Apache Cassandra.
Vous devez respecter les prérequis suivants :
Approvisionnez Azure Cosmos DB for Apache Cassandra. Consultez Créer un compte de base de données.
Approvisionnez un cluster HDInsight-Spark. Consultez Créer un cluster Apache Spark dans Azure HDInsight à l’aide d’un modèle Resource Manager.
API pour la configuration de Cassandra dans Spark2. Le connecteur Spark pour Cassandra exige que les informations de la connexion Cassandra soient initialisées dans le contexte Spark. Lorsque vous lancez un notebook Jupyter, la session et le contexte spark sont déjà initialisés. N’arrêtez pas et ne réinitialisez pas le contexte Spark tant qu’il n’est pas doté de la configuration complète définie dans le cadre du démarrage du notebook Jupyter par défaut de HDInsight. Une solution de contournement consiste à ajouter directement les détails de l’instance Cassandra à la configuration du service Spark2 Ambari. Cette opération n’est nécessaire qu’une seule fois pour chacun des clusters qui requièrent un redémarrage du service Spark2.
Accédez au service Spark2 Ambari, puis sélectionnez les configurations.
Accédez aux valeurs spark2-defaults personnalisées et ajoutez une nouvelle propriété avec le code suivant, puis redémarrez le service Spark2 :
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Vous pouvez l’utiliser cqlsh
pour la validation. Pour plus d’information, consultez Connexion à Azure Cosmos DB for Apache Cassandra à partir de Spark.
Accéder à Azure Cosmos DB for Apache Cassandra à partir de l’interpréteur de commandes Spark
L’interpréteur de commandes Spark est utilisé à des fins de test et d’exploration.
Lancez
spark-shell
avec les dépendances maven requises compatibles avec la version Spark de votre cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Exécutez certaines opérations DDL et DML.
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Exécutez des opérations CRUD.
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Accéder à Azure Cosmos DB for Apache Cassandra à partir de notebooks Jupyter
HDInsight-Spark est fourni avec les services de bloc-notes Zeppelin et Jupyter. Ce sont deux environnements de notebook basés sur le web qui prennent en charge Scala et Python. Les notebooks sont parfaits pour l’analytique exploratoire interactive et la collaboration, mais ne sont pas conçus pour les processus opérationnels ou de production.
Les notebooks Jupyter suivants peuvent être chargés dans votre cluster HDInsight Spark et fournissent des exemples d’utilisation d’Azure Cosmos DB for Apache Cassandra immédiatement exploitables. Veillez à consulter le premier notebook 1.0-ReadMe.ipynb
, qui contient la configuration du service Spark requise pour la connexion à Azure Cosmos DB for Apache Cassandra.
Téléchargez les notebooks disponibles sous azure-cosmos-db-cassandra-api-spark-notebooks-jupyter sur votre machine.
Procédure à suivre pour le chargement
Quand vous lancez Jupyter, accédez à Scala. Créez un répertoire, puis chargez-y les notebooks. Le bouton Charger se trouve en haut à droite.
Procédure à suivre pour l’exécution
Parcourez les notebooks et chaque cellule de ceux-ci séquentiellement. Sélectionnez le bouton Exécuter en haut de chaque notebook pour exécuter toutes les cellules, ou appuyez sur Maj+Entrée pour chaque cellule.
Accéder à Azure Cosmos DB for Apache Cassandra à partir de votre programme Scala Spark
Pour les processus automatisés dans l’environnement de production, les programmes Spark sont envoyés au cluster au moyen du script spark-submit.