Zugreifen auf Azure Cosmos DB for Apache Cassandra in Spark für YARN mit HDInsight
GILT FÜR: Cassandra
In diesem Artikel wird der Zugriff auf Azure Cosmos DB for Apache Cassandra in Spark für YARN mit HDInsight Spark über spark-shell
behandelt. HDInsight ist der Hortonworks Hadoop-PaaS von Microsoft in Azure. Er verwendet Objektspeicher für HDFS und ist in mehreren Varianten verfügbar, z. B. Spark. Obwohl sich dieser Artikel auf HDInsight-Spark bezieht, gilt er für alle Hadoop-Verteilungen.
Voraussetzungen
Überprüfen Sie die Grundlagen des Herstellens der Verbindung mit Azure Cosmos DB for Apache Cassandra, bevor Sie beginnen.
Die folgenden Voraussetzungen müssen erfüllt sein:
Stellen Sie Azure Cosmos DB for Apache Cassandra bereit. Siehe Erstellen eines Datenbankkontos.
Ein HDInsight Spark-Cluster muss bereitgestellt sein. Siehe Erstellen eines Apache Spark-Clusters in Azure HDInsight mithilfe einer ARM-Vorlage.
API für Cassandra-Konfiguration in Spark2. Für den Spark-Connector für Cassandra müssen die Details der Cassandra-Verbindung als Teil des Spark-Kontexts initialisiert werden. Wenn Sie ein Jupyter-Notebook starten, sind Spark-Sitzung und Kontext bereits initialisiert. Beenden Sie den Spark-Kontext und initialisieren Sie ihn erst dann neu, wenn er mit allen Konfigurationseinstellungen für den Standardstart von Jupyter-Notebooks in HDInsight vollständig ist. Eine Problemumgehung ist, die Details der Cassandra-Instanz direkt zur Dienstkonfiguration von Spark2 mit Ambari hinzuzufügen. Diese Vorgehensweise ist eine einmalige Aktivität pro Cluster, die einen Neustart des Spark2-Diensts erfordert.
Navigieren Sie zum Spark2-Dienst mit Ambari, und wählen Sie die Konfigurationen aus.
Navigieren Sie zu „spark2-defaults“, fügen Sie eine neue Eigenschaft mit dem folgenden Code hinzu, und starten Sie den Spark2-Dienst neu:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Sie können cqlsh
zur Überprüfung verwenden. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit Azure Cosmos DB for Apache Cassandra in Spark.
Zugreifen auf Azure Cosmos DB for Apache Cassandra über die Spark-Shell
Die Spark-Shell wird für Tests und Untersuchungen verwendet.
Starten Sie
spark-shell
mit den erforderlichen Maven-Abhängigkeiten, die mit der Spark-Version Ihres Clusters kompatibel sind.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Führen Sie einige DDL- und DML-Vorgänge aus:
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Führen Sie CRUD-Vorgänge aus:
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Zugreifen auf Azure Cosmos DB for Apache Cassandra über Jupyter-Notebooks
HDInsight Spark beinhaltet die Dienste Zeppelin und Jupyter Notebook. Hierbei handelt es sich um webbasierte Notebookumgebungen, die Scala und Python unterstützen. Notebooks sind hervorragend für die interaktive explorative Analyse und Zusammenarbeit geeignet, jedoch nicht für Betriebs- bzw. Produktionsprozesse vorgesehen.
Die folgenden Jupyter-Notebooks können in Ihre HDInsight Spark-Cluster hochgeladen werden und bieten sofort nutzbare Beispiele für das Arbeiten mit Azure Cosmos DB for Apache Cassandra. Lesen Sie unbedingt das erste Notebook 1.0-ReadMe.ipynb
zur Spark-Dienstkonfiguration, um eine Verbindung mit Azure Cosmos DB for Apache Cassandra herzustellen.
Laden Sie die Notebooks unter azure-cosmos-db-cassandra-api-spark-notebooks-jupyter auf Ihrem Computer herunter.
Informationen zum Upload
Navigieren Sie zu Scala, nachdem Sie Jupyter gestartet haben. Erstellen Sie ein Verzeichnis, und laden Sie die Notebooks in das Verzeichnis hoch. Die Schaltfläche zum Hochladen befindet sich am oberen Rand auf der rechten Seite.
Informationen zur Ausführung
Gehen Sie nacheinander die Notebooks und die einzelnen Notebookzellen durch. Wählen Sie die Schaltfläche zum Ausführen am oberen Rand jedes Notebooks aus, um alle Zellen auszuführen, oder drücken Sie für jede Zelle die UMSCHALT+EINGABETASTE.
Zugreifen mit Azure Cosmos DB for Apache Cassandra über Ihr Spark Scala-Programm
Bei automatisierten Prozessen in der Produktion werden Spark-Programme mit spark-submit an den Cluster übermittelt.