Få åtkomst till Azure Cosmos DB för Apache Cassandra från Spark på YARN med HDInsight
GÄLLER FÖR: Kassandra
Den här artikeln beskriver hur du får åtkomst till Azure Cosmos DB för Apache Cassandra från Spark på YARN med HDInsight-Spark från spark-shell
. HDInsight är Microsofts Hortonworks Hadoop PaaS på Azure. Den använder objektlagring för HDFS och finns i flera varianter, inklusive Spark. Även om den här artikeln refererar till HDInsight-Spark gäller den för alla Hadoop-distributioner.
Förutsättningar
Innan du börjar bör du läsa grunderna för att ansluta till Azure Cosmos DB för Apache Cassandra.
Du behöver följande förutsättningar:
Etablera Azure Cosmos DB för Apache Cassandra. Se Skapa ett databaskonto.
Etablera ett HDInsight-Spark-kluster. Se Skapa Apache Spark-kluster i Azure HDInsight med arm-mall.
API för Cassandra-konfiguration i Spark2. Spark-anslutningsappen för Cassandra kräver att Cassandra-anslutningsinformationen initieras som en del av Spark-kontexten. När du startar en Jupyter-notebook-fil initieras spark-sessionen och kontexten redan. Stoppa och initiera inte Spark-kontexten igen om den inte är komplett med alla konfigurationsuppsättningar som en del av HDInsight-standardstarten för Jupyter Notebook. En lösning är att lägga till information om Cassandra-instansen i Ambari, Spark2-tjänstkonfiguration, direkt. Den här metoden är en engångsaktivitet per kluster som kräver en omstart av Spark2-tjänsten.
Gå till tjänsten Ambari, Spark2 och välj konfigurationer.
Gå till anpassade spark2-standardvärden och lägg till en ny egenskap med följande och starta om Spark2-tjänsten:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Du kan använda cqlsh
för validering. Mer information finns i Ansluta till Azure Cosmos DB för Apache Cassandra från Spark.
Få åtkomst till Azure Cosmos DB för Apache Cassandra från Spark-gränssnittet
Spark Shell används för testning och utforskning.
Starta
spark-shell
med nödvändiga maven-beroenden som är kompatibla med ditt klusters Spark-version.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Köra vissa DDL- och DML-åtgärder
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Köra CRUD-åtgärder
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Få åtkomst till Azure Cosmos DB för Apache Cassandra från Jupyter Notebooks
HDInsight-Spark levereras med Zeppelin- och Jupyter Notebook-tjänster. De är båda webbaserade notebook-miljöer som stöder Scala och Python. Notebook-filer är bra för interaktiv undersökande analys och samarbete, men inte avsedda för drift- eller produktionsprocesser.
Följande Jupyter-notebook-filer kan laddas upp till ditt HDInsight Spark-kluster och tillhandahålla färdiga exempel för att arbeta med Azure Cosmos DB för Apache Cassandra. Se till att granska den första notebook-filen 1.0-ReadMe.ipynb
för att granska Spark-tjänstkonfigurationen för anslutning till Azure Cosmos DB för Apache Cassandra.
Ladda ned notebook-filerna under azure-cosmos-db-cassandra-api-spark-notebooks-jupyter till datorn.
Ladda upp
När du startar Jupyter går du till Scala. Skapa en katalog och ladda sedan upp anteckningsböckerna till katalogen. Knappen Ladda upp finns längst upp till höger.
Så här kör du
Gå igenom notebook-filerna och varje notebook-cell sekventiellt. Välj knappen Kör överst i varje anteckningsbok för att köra alla celler eller Skift+Retur för varje cell.
Åtkomst med Azure Cosmos DB för Apache Cassandra från ditt Spark Scala-program
För automatiserade processer i produktion skickas Spark-program till klustret med hjälp av spark-submit.