Acesse o Azure Cosmos DB for Apache Cassandra do Spark no YARN com HDInsight
APLICA-SE AO: Cassandra
Este artigo aborda como acessar o Azure Cosmos DB for Apache Cassandra do Spark no YARN com HDInsight-Spark do spark-shell
. O HDInsight é o Hortonworks Hadoop PaaS da Microsoft no Azure. Ele usa o armazenamento de objetos para HDFS e vem em vários tipos, incluindo Spark. Embora este artigo se refira ao HDInsight-Spark, ele se aplica a todas as distribuições do Hadoop.
Pré-requisitos
Antes de começar, examine os conceitos básicos de se conectar ao Azure Cosmos DB for Apache Cassandra.
Você precisa dos pré-requisitos a seguir:
Provisione o Azure Cosmos DB for Apache Cassandra. Confira Criar uma conta de banco de dados.
Provisionar um cluster do HDInsight-Spark. Confira Criar um cluster do Apache Spark no Azure HDInsight usando um modelo do Resource Manager.
Configuração da API para Cassandra no Spark2. O conector do Spark para Cassandra exige que os detalhes da conexão do Cassandra sejam inicializados como parte do contexto do Spark. Quando você inicia um Jupyter Notebook, a sessão e o contexto do Spark já são inicializados também. Não é aconselhável parar e reinicializar o contexto do Spark, a menos que esteja completo com cada configuração definida como parte da inicialização do Jupyter Notebook padrão do HDInsight. Uma solução alternativa é incluir os detalhes da instância do Cassandra na configuração do serviço Ambari, Spark2 diretamente. Esta abordagem é uma atividade única por cluster que requer uma reinicialização do serviço Spark2.
Vá para o Ambari, serviço Spark2, e selecione configurações.
Vá para custom spark2-defaults e inclua uma nova propriedade com o seguinte e reinicie o serviço Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Você pode usar cqlsh
para validação. Para obter mais informações, consulte Conectando-se ao Azure Cosmos DB for Apache Cassandra do Spark.
Acesse o Azure Cosmos DB for Apache Cassandra do shell do Spark
O shell do Spark é usado para teste e exploração.
Inicie
spark-shell
com as dependências do maven necessárias, compatíveis com a versão do Spark do seu cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Execute algumas operações de DDL e DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Executar operações CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Acesse o Azure Cosmos DB for Apache Cassandra em notebooks Jupyter
HDInsight Spark vem com os serviços de bloco de anotações do Zeppelin e Jupyter. Eles são ambos os ambientes de notebook baseado na Web que dão suporte ao Scala e ao Python. Os Notebooks são ótimos para análises exploratórias interativas e colaboração, mas não se destinam a processos operacionais/produtivos.
Os seguintes notebooks Jupyter podem ser carregados em seu cluster HDInsight Spark e fornecem amostras prontas para trabalhar com o Azure Cosmos DB for Apache Cassandra. Examine o primeiro notebook 1.0-ReadMe.ipynb
para examinar a configuração de serviço do Spark para conectar-se ao Azure Cosmos DB for Apache Cassandra.
Baixe os Notebooks sob azure-cosmos-db-cassandra-api-spark-notebooks-jupyter para o seu computador.
Como carregar
Ao iniciar o Jupyter, navegue até o Scala. Crie um diretório e carregue os Notebooks nele. O botão Carregar está no canto superior direito.
Como executar
Acesse os Notebooks em sequência, por meio de cada célula de Notebook existente. Clique no botão Executar, na parte superior de cada Notebook, para executar todas as células, ou em Shift+Enter para executar cada célula.
Acesse com o Azure Cosmos DB for Apache Cassandra do seu programa Spark Scala
Para processos automatizados em produção, os programas do Spark são enviados ao cluster por meio do spark-submit.