Freigeben über


Zugreifen auf Azure Cosmos DB for Apache Cassandra in Spark für YARN mit HDInsight

GILT FÜR: Cassandra

In diesem Artikel wird der Zugriff auf Azure Cosmos DB for Apache Cassandra in Spark für YARN mit HDInsight Spark über spark-shell behandelt. HDInsight ist der Hortonworks Hadoop-PaaS von Microsoft in Azure. Er verwendet Objektspeicher für HDFS und ist in mehreren Varianten verfügbar, z. B. Spark. Obwohl sich dieser Artikel auf HDInsight-Spark bezieht, gilt er für alle Hadoop-Verteilungen.

Voraussetzungen

Überprüfen Sie die Grundlagen des Herstellens der Verbindung mit Azure Cosmos DB for Apache Cassandra, bevor Sie beginnen.

Die folgenden Voraussetzungen müssen erfüllt sein:

  • Stellen Sie Azure Cosmos DB for Apache Cassandra bereit. Siehe Erstellen eines Datenbankkontos.

  • Ein HDInsight Spark-Cluster muss bereitgestellt sein. Siehe Erstellen eines Apache Spark-Clusters in Azure HDInsight mithilfe einer ARM-Vorlage.

  • API für Cassandra-Konfiguration in Spark2. Für den Spark-Connector für Cassandra müssen die Details der Cassandra-Verbindung als Teil des Spark-Kontexts initialisiert werden. Wenn Sie ein Jupyter-Notebook starten, sind Spark-Sitzung und Kontext bereits initialisiert. Beenden Sie den Spark-Kontext und initialisieren Sie ihn erst dann neu, wenn er mit allen Konfigurationseinstellungen für den Standardstart von Jupyter-Notebooks in HDInsight vollständig ist. Eine Problemumgehung ist, die Details der Cassandra-Instanz direkt zur Dienstkonfiguration von Spark2 mit Ambari hinzuzufügen. Diese Vorgehensweise ist eine einmalige Aktivität pro Cluster, die einen Neustart des Spark2-Diensts erfordert.

    1. Navigieren Sie zum Spark2-Dienst mit Ambari, und wählen Sie die Konfigurationen aus.

    2. Navigieren Sie zu „spark2-defaults“, fügen Sie eine neue Eigenschaft mit dem folgenden Code hinzu, und starten Sie den Spark2-Dienst neu:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Sie können cqlsh zur Überprüfung verwenden. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit Azure Cosmos DB for Apache Cassandra in Spark.

Zugreifen auf Azure Cosmos DB for Apache Cassandra über die Spark-Shell

Die Spark-Shell wird für Tests und Untersuchungen verwendet.

  • Starten Sie spark-shell mit den erforderlichen Maven-Abhängigkeiten, die mit der Spark-Version Ihres Clusters kompatibel sind.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Führen Sie einige DDL- und DML-Vorgänge aus:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Führen Sie CRUD-Vorgänge aus:

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Zugreifen auf Azure Cosmos DB for Apache Cassandra über Jupyter-Notebooks

HDInsight Spark beinhaltet die Dienste Zeppelin und Jupyter Notebook. Hierbei handelt es sich um webbasierte Notebookumgebungen, die Scala und Python unterstützen. Notebooks sind hervorragend für die interaktive explorative Analyse und Zusammenarbeit geeignet, jedoch nicht für Betriebs- bzw. Produktionsprozesse vorgesehen.

Die folgenden Jupyter-Notebooks können in Ihre HDInsight Spark-Cluster hochgeladen werden und bieten sofort nutzbare Beispiele für das Arbeiten mit Azure Cosmos DB for Apache Cassandra. Lesen Sie unbedingt das erste Notebook 1.0-ReadMe.ipynb zur Spark-Dienstkonfiguration, um eine Verbindung mit Azure Cosmos DB for Apache Cassandra herzustellen.

Laden Sie die Notebooks unter azure-cosmos-db-cassandra-api-spark-notebooks-jupyter auf Ihrem Computer herunter.

Informationen zum Upload

Navigieren Sie zu Scala, nachdem Sie Jupyter gestartet haben. Erstellen Sie ein Verzeichnis, und laden Sie die Notebooks in das Verzeichnis hoch. Die Schaltfläche zum Hochladen befindet sich am oberen Rand auf der rechten Seite.

Informationen zur Ausführung

Gehen Sie nacheinander die Notebooks und die einzelnen Notebookzellen durch. Wählen Sie die Schaltfläche zum Ausführen am oberen Rand jedes Notebooks aus, um alle Zellen auszuführen, oder drücken Sie für jede Zelle die UMSCHALT+EINGABETASTE.

Zugreifen mit Azure Cosmos DB for Apache Cassandra über Ihr Spark Scala-Programm

Bei automatisierten Prozessen in der Produktion werden Spark-Programme mit spark-submit an den Cluster übermittelt.

Nächste Schritte