Sdílet prostřednictvím


Přístup ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku v YARN s využitím HDInsightu

PLATÍ PRO: Cassandra

Tento článek popisuje, jak získat přístup ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku v YARN s HDInsight-Sparkem z spark-shell. HDInsight je Hortonworks Hadoop PaaS od Microsoftu v Azure. Používá úložiště objektů pro HDFS a dodává se v několika variantách, včetně Sparku. I když se tento článek týká HDInsight-Sparku, platí pro všechny distribuce Hadoopu.

Požadavky

Než začnete, projděte si základy připojení ke službě Azure Cosmos DB pro Apache Cassandra.

Potřebujete následující požadavky:

  • Zřízení služby Azure Cosmos DB pro Apache Cassandra Viz Vytvoření databázového účtu.

  • Zřízení clusteru HDInsight-Spark Viz Vytvoření clusteru Apache Spark ve službě Azure HDInsight pomocí šablony ARM.

  • Konfigurace rozhraní API pro Cassandra ve Spark2 Konektor Spark pro Cassandra vyžaduje, aby se podrobnosti o připojení Cassandra inicializovaly jako součást kontextu Sparku. Když spustíte poznámkový blok Jupyter, relace Sparku a kontext se už inicializují. Nezastavujte a znovu inicializujte kontext Sparku, pokud není kompletní s každou sadou konfigurace v rámci výchozího spuštění poznámkového bloku Jupyter hdInsight. Jedním z alternativních řešení je přímé přidání podrobností instance Cassandra do konfigurace služby Ambari, Spark2. Tento přístup je jednorázová aktivita na cluster, který vyžaduje restartování služby Spark2.

    1. Přejděte do služby Ambari, Spark2 a vyberte konfigurace.

    2. Přejděte do vlastních výchozích hodnot Spark2 a přidejte novou vlastnost s následujícím kódem a restartujte službu Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

K ověření můžete použít cqlsh . Další informace najdete v tématu Připojení ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku.

Přístup ke službě Azure Cosmos DB pro Apache Cassandra z prostředí Spark

Prostředí Spark se používá k testování a zkoumání.

  • Spusťte spark-shell s požadovanými závislostmi Mavenu kompatibilními s verzí Sparku vašeho clusteru.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Provádění některých operací DDL a DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Spuštění operací CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Přístup ke službě Azure Cosmos DB for Apache Cassandra z poznámkových bloků Jupyter

HDInsight-Spark se dodává se službami Zeppelin a Jupyter Notebook. Jedná se o webová prostředí poznámkových bloků, která podporují Scala i Python. Poznámkové bloky jsou skvělé pro interaktivní průzkumnou analýzu a spolupráci, ale nejsou určené pro provozní nebo produkční procesy.

Následující poznámkové bloky Jupyter je možné nahrát do clusteru HDInsight Spark a poskytnout připravené ukázky pro práci se službou Azure Cosmos DB pro Apache Cassandra. Nezapomeňte si projít první poznámkový blok 1.0-ReadMe.ipynb a zkontrolovat konfiguraci služby Spark pro připojení ke službě Azure Cosmos DB pro Apache Cassandra.

Stáhněte si poznámkové bloky ve službě azure-cosmos-db-cassandra-api-spark-notebooks-jupyter do počítače.

Jak nahrát

Když spustíte Jupyter, přejděte na Scala. Vytvořte adresář a pak nahrajte poznámkové bloky do adresáře. Tlačítko Nahrát je v pravém horním rohu.

Postup spuštění

Projděte si poznámkové bloky a jednotlivé buňky poznámkového bloku postupně. Vyberte tlačítko Spustit v horní části každého poznámkového bloku, aby se spustily všechny buňky, nebo shift+Enter pro každou buňku.

Přístup pomocí služby Azure Cosmos DB for Apache Cassandra z programu Spark Scala

V případě automatizovaných procesů v produkčním prostředí se programy Sparku odesílají do clusteru pomocí spark-submit.

Další kroky