Udostępnij za pośrednictwem


Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark w usłudze YARN przy użyciu usługi HDInsight

DOTYCZY: Kasandra

W tym artykule opisano sposób uzyskiwania dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark na platformie YARN przy użyciu usługi HDInsight-Spark z usługi spark-shell. HDInsight to platforma Hortonworks Hadoop PaaS firmy Microsoft na platformie Azure. Korzysta z magazynu obiektów dla systemu plików HDFS i jest dostępny w kilku wersjach, w tym Spark. Chociaż ten artykuł odnosi się do usługi HDInsight-Spark, dotyczy wszystkich dystrybucji hadoop.

Wymagania wstępne

Przed rozpoczęciem zapoznaj się z podstawami nawiązywania połączenia z usługą Azure Cosmos DB dla usługi Apache Cassandra.

Potrzebne są następujące wymagania wstępne:

  • Aprowizuj usługę Azure Cosmos DB dla bazy danych Apache Cassandra. Zobacz Tworzenie konta bazy danych.

  • Aprowizuj klaster HDInsight-Spark. Zobacz Tworzenie klastra Apache Spark w usłudze Azure HDInsight przy użyciu szablonu usługi ARM.

  • Interfejs API dla konfiguracji bazy danych Cassandra na platformie Spark2. Łącznik Spark dla rozwiązania Cassandra wymaga zainicjowania szczegółów połączenia Cassandra w ramach kontekstu platformy Spark. Po uruchomieniu notesu Jupyter sesja i kontekst platformy Spark są już inicjowane. Nie zatrzymaj i ponownie zainicjuj kontekstu platformy Spark, chyba że zostanie on ukończony z każdym zestawem konfiguracji w ramach domyślnego uruchamiania notesu Jupyter usługi HDInsight. Jednym z obejść jest dodanie szczegółów wystąpienia cassandra do konfiguracji usługi Ambari, Spark2 bezpośrednio. Takie podejście jest jednorazowym działaniem na klaster, który wymaga ponownego uruchomienia usługi Spark2.

    1. Przejdź do pozycji Ambari, Spark2 service i wybierz pozycję configs (Konfiguracje).

    2. Przejdź do niestandardowych wartości spark2-defaults i dodaj nową właściwość z następującymi elementami i uruchom ponownie usługę Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Do weryfikacji można użyć cqlsh polecenia . Aby uzyskać więcej informacji, zobacz Connecting to Azure Cosmos DB for Apache Cassandra from Spark (Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra na platformie Spark).

Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu powłoki Spark

Powłoka Spark jest używana do testowania i eksploracji.

  • Uruchom program spark-shell z wymaganymi zależnościami maven zgodnymi z wersją platformy Spark klastra.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Wykonywanie niektórych operacji DDL i DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Uruchamianie operacji CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z notesów Jupyter

Usługa HDInsight-Spark jest dostarczana z usługami notesów Zeppelin i Jupyter. Są to środowiska notesów internetowych, które obsługują język Scala i Python. Notesy doskonale nadają się do interaktywnej analizy eksploracyjnej i współpracy, ale nie są przeznaczone dla procesów operacyjnych ani produkcyjnych.

Następujące notesy Jupyter można przekazać do klastra HDInsight Spark i udostępnić gotowe przykłady do pracy z usługą Azure Cosmos DB dla systemu Apache Cassandra. Zapoznaj się z pierwszym notesem 1.0-ReadMe.ipynb , aby zapoznać się z konfiguracją usługi Spark na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla systemu Apache Cassandra.

Pobierz notesy w obszarze azure-cosmos-db-cassandra-api-spark-notebooks-jupyter na maszynę.

Jak przekazać

Po uruchomieniu programu Jupyter przejdź do języka Scala. Utwórz katalog, a następnie przekaż notesy do katalogu. Przycisk Przekaż znajduje się w prawym górnym rogu.

Jak uruchomić

Przejrzyj notesy i poszczególne komórki notesu sekwencyjnie. Wybierz przycisk Uruchom w górnej części każdego notesu, aby uruchomić wszystkie komórki, lub Shift+Enter dla każdej komórki.

Dostęp za pomocą usługi Azure Cosmos DB for Apache Cassandra z poziomu programu Spark Scala

W przypadku zautomatyzowanych procesów w środowisku produkcyjnym programy Spark są przesyłane do klastra przy użyciu funkcji spark-submit.

Następne kroki