Acceso a Azure Cosmos DB for Apache Cassandra desde Spark en YARN con HDInsight
SE APLICA A: Cassandra
En este artículo se explica cómo acceder a Azure Cosmos DB for Apache Cassandra desde Spark en YARN con HDInsight-Spark desde spark-shell
. HDInsight es un PaaS de Hortonworks Hadoop de Microsoft en Azure. Usa el almacenamiento de objetos para HDFS y se suministra con varios tipos, incluido Spark. Aunque en este artículo se hace referencia a HDInsight-Spark, se aplica a todas las distribuciones de Hadoop.
Requisitos previos
Antes de empezar, revise los conceptos básicos de la conexión a Azure Cosmos DB for Apache Cassandra.
Los siguientes requisitos previos son necesarios:
Aprovisionamiento de Azure Cosmos DB for Apache Cassandra. Consulte Creación de una cuenta de base de datos.
Aprovisionamiento de un clúster de HDInsight-Spark. Consulte Creación de un clúster de Apache Spark en Azure HDInsight mediante una plantilla de ARM.
Configuración de la API para Cassandra en Spark2. El conector de Spark para Cassandra requiere que los detalles de la conexión de Cassandra se inicialicen como parte del contexto de Spark. Al iniciar una instancia de Jupyter Notebook, la sesión y el contexto ya se inicializan. Continúe y reinicialice el contexto de Spark, a menos que se complete con cada configuración establecida como parte del inicio predeterminado de una instancia de Jupyter Notebook por parte de HDInsight. Una solución alternativa es agregar los detalles de la instancia de Cassandra directamente a la configuración del servicio Ambari, Spark2. Este enfoque es una actividad que se realiza una sola vez por clúster y que requiere que se reinicie el servicio Spark2.
Vaya al servicio Ambari, Spark2, y seleccione las configuraciones.
Luego, vaya a spark2-defaults personalizado y agregue una nueva propiedad con lo siguiente, y reinicie el servicio Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Puede usar cqlsh
para la validación. Para más información, consulte Conexión a Azure Cosmos DB for Apache Cassandra desde Spark.
Acceso a Azure Cosmos DB for Apache Cassandra desde el shell de Spark
El shell de Spark se usa para la prueba y exploración.
Inicie
spark-shell
con las dependencias de Maven necesarias, compatibles con la versión de Spark de su clúster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Ejecute algunas operaciones DDL y DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Ejecute operaciones CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Acceso a Azure Cosmos DB for Apache Cassandra desde cuadernos de Jupyter
HDInsight-Spark incluye los servicios Zeppelin y Jupyter Notebook. Ambos son entornos de cuadernos basados en web que admiten Scala y Python. Los cuadernos son excelentes para la colaboración y el análisis exploratorio interactivo, pero no están pensados para procesos operativos ni de producción.
Los siguientes cuadernos de Jupyter Notebook se pueden cargar en un clúster de HDInsight Spark y proporcionan ejemplos para trabajar con Azure Cosmos DB for Apache Cassandra. Asegúrese de consultar el primer cuaderno 1.0-ReadMe.ipynb
para revisar la configuración del servicio de Spark para conectarse a Azure Cosmos DB for Apache Cassandra.
Descargue los cuadernos de azure-cosmos-db-cassandra-api-spark-notebooks-jupyter en su equipo.
Procedimiento para la carga
Al iniciar Jupyter, vaya a Scala. Cree un directorio y, después, cargue los cuadernos en el directorio. El botón Cargar se encuentra en la parte superior derecha.
Procedimiento para la ejecución
Recorra los cuadernos, y cada celda de ellos, de manera secuencial. Seleccione el botón Ejecutar de la parte superior de cada cuaderno para ejecutar todas las celdas, o bien Mayús+Entrar para ejecutar cada una de ellas.
Acceso con Azure Cosmos DB for Apache Cassandra desde el programa Spark Scala
En los procesos automatizados en producción, los programas de Spark se envían al clúster a través de spark-submit.