Accedere ad Azure Cosmos DB for Apache Cassandra da Spark in YARN con HDInsight
SI APPLICA A: Cassandra
Questo articolo illustra come accedere ad Azure Cosmos DB for Apache Cassandra da Spark in YARN con HDInsight Spark da spark-shell
. HDInsight è la soluzione PaaS Hortonworks Hadoop di Microsoft in Azure. Usa l'archiviazione di oggetti per Hadoop Distributed File System e include diverse versioni, tra cui Spark. Anche se questo articolo fa riferimento a HDInsight Spark, si applica a tutte le distribuzioni Hadoop.
Prerequisiti
Prima di iniziare, esaminare le nozioni di base sulla connessione ad Azure Cosmos DB for Apache Cassandra.
È necessario soddisfare i prerequisiti seguenti:
Effettuare il provisioning di Azure Cosmos DB for Apache Cassandra. Vedere Creare un account di database.
Effettuare il provisioning di un cluster HDInsight Spark. Vedere Creare un cluster Apache Spark in Azure HDInsight usando un modello di ARM.
Configurazione dell'API for Cassandra in Spark2. Il connettore Spark per Cassandra richiede che i dettagli della connessione a Cassandra vengano inizializzati come parte del contesto Spark. Quando si avvia un notebook Jupyter, la sessione Spark e il contesto sono già inizializzati. Non arrestare e reinizializzare il contesto Spark a meno che non sia completo di ogni set di configurazione come parte dell'avvio predefinito del notebook Jupyter di HDInsight. Una soluzione alternativa consiste nell'aggiungere i dettagli dell'istanza di Cassandra direttamente alla configurazione del servizio Ambari Spark2. Si tratta di un'attività occasionale per ogni cluster che richiede un riavvio del servizio Spark2.
Passare al servizio Ambari Spark2 e selezionare le configurazioni.
Passare alle impostazione predefinite di Spark2 personalizzate, aggiungere una nuova proprietà con il codice seguente e riavviare il servizio Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
È possibile usare cqlsh
per la convalida. Per altre informazioni, vedere Connessione ad Azure Cosmos DB for Apache Cassandra da Spark.
Accedere ad Azure Cosmos DB for Apache Cassandra dalla shell di Spark
La shell di Spark viene usata per il test e l'esplorazione.
Avviare
spark-shell
con le dipendenze di Maven necessarie compatibili con la versione di Spark del cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Eseguire alcune operazioni DDL e DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Eseguire operazioni CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Accedere ad Azure Cosmos DB for Apache Cassandra dai notebook Jupyter
HDInsight Spark include i servizi Zeppelin e Jupyter Notebook. Sono entrambi ambienti notebook basati sul Web che supportano Scala e Python. I notebook sono ideali per le analisi esplorative interattive e la collaborazione, ma non sono destinati a processi operativi o di produzione.
I notebook Jupyter seguenti possono essere caricati nel cluster HDInsight Spark e forniscono esempi pronti per l'uso con Azure Cosmos DB for Apache Cassandra. Assicurarsi di esaminare il file 1.0-ReadMe.ipynb
del primo notebook per verificare la configurazione del servizio Spark per la connessione ad Azure Cosmos DB for Apache Cassandra.
Scaricare i notebook da azure-cosmos-db-cassandra-api-spark-notebooks-jupyter nel computer.
Modalità di caricamento
Quando si avvia Jupyter, passare a Scala. Creare una directory e quindi caricare i notebook nella directory. Il pulsante Carica si trova in alto a destra.
Modalità di esecuzione
Esaminare i notebook e ogni cella dei notebook in sequenza. Fare clic sul pulsante Esegui nella parte superiore di ogni notebook per eseguire tutte le celle o premere MAIUSC+INVIO per eseguire le singole celle.
Accedere con Azure Cosmos DB for Apache Cassandra dal programma Spark Scala
Per i processi di produzione automatizzati, i programmi Spark vengono inviati al cluster tramite spark-submit.