Vom Hive Warehouse Connector unterstützte Apache Spark-Vorgänge in Azure HDInsight
In diesem Artikel werden Spark-basierte Vorgänge erläutert, die vom Hive Warehouse Connector (HWC) unterstützt werden. Alle gezeigten Beispiele werden über die Apache Spark-Shell ausgeführt.
Voraussetzungen
Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.
Erste Schritte
Führen Sie die folgenden Schritte aus, um eine spark-shell-Sitzung zu starten:
Verwenden Sie den Befehl ssh, um eine Verbindung mit Ihrem Apache Spark-Cluster herzustellen. Bearbeiten Sie den Befehl, indem Sie CLUSTERNAME durch den Namen Ihres Clusters ersetzen, und geben Sie den Befehl dann ein:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Führen Sie in Ihrer SSH-Sitzung den folgenden Befehl aus, und notieren Sie sich die Version von
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Bearbeiten Sie den Code, indem Sie die oben ermittelte Version für
hive-warehouse-connector-assembly
verwenden. Führen Sie anschließend den Befehl aus, um die Spark-Shell zu starten:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Nachdem die Spark-Shell gestartet wurde, kann mit den folgenden Befehlen eine Hive Warehouse Connector-Instanz gestartet werden:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Erstellen von Spark-Datenrahmen mithilfe von Hive-Abfragen
Die Ergebnisse aller Abfragen mit Verwendung der HWC-Bibliothek werden als Datenrahmen zurückgegeben. In den folgenden Beispielen wird veranschaulicht, wie Sie eine einfache Hive-Abfrage erstellen.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Bei den Ergebnissen der Abfrage handelt es sich um Spark-Datenrahmen, die mit Spark-Bibliotheken wie MLIB und SparkSQL verwendet werden können.
Schreiben von Spark-Datenrahmen in Hive-Tabellen
Spark bietet keine native Unterstützung für das Schreiben in verwaltete ACID-Tabellen von Hive. Mithilfe von HWC können Sie jedoch alle Datenrahmen in eine Hive-Tabelle schreiben. Diese Funktionalität wird im folgenden Beispiel veranschaulicht:
Erstellen Sie eine Tabelle mit dem Namen
sampletable_colorado
, und geben Sie Spalten dafür an, indem Sie den folgenden Befehl verwenden:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filtern Sie die Tabelle
hivesampletable
, in der die Spaltestate
den EintragColorado
enthält. Diese Hive-Abfrage gibt einen Spark-Datenrahmen zurück, und das Ergebnis wird mit der Funktionwrite
in der Hive-Tabellesampletable_colorado
gespeichert.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Zeigen Sie die Ergebnisse mit dem folgenden Befehl an:
hive.table("sampletable_colorado").show()
Schreibvorgänge per strukturiertem Stream
Mit Hive Warehouse Connector können Sie das Spark-Streaming nutzen, um Daten in Hive-Tabellen zu schreiben.
Wichtig
Strukturierte Streamingschreibvorgänge werden in ESP-fähigen Spark 4.0-Clustern nicht unterstützt.
Mithilfe der Schritte können Sie Daten aus einem Spark-Stream vom Localhost-Port 9999 abrufen und als Hive-Tabelle formatieren. (über den Hive Warehouse Connector).
Starten Sie in der geöffneten Spark-Shell mit dem folgenden Befehl einen Spark-Stream:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Generieren Sie Daten für den von Ihnen erstellten Spark-Stream, indem Sie die folgenden Schritte ausführen:
- Richten Sie im selben Spark-Cluster eine zweite SSH-Sitzung ein.
- Geben Sie an der Eingabeaufforderung
nc -lk 9999
ein: Bei diesem Befehl wird das Hilfsprogrammnetcat
verwendet, um Daten über die Befehlszeile an den angegebenen Port zu senden.
Kehren Sie zur ersten SSH-Sitzung zurück, und erstellen Sie für die Streamingdaten eine neue Hive-Tabelle. Geben Sie in spark-shell den folgenden Befehl ein:
hive.createTable("stream_table").column("value","string").create()
Schreiben Sie anschließend die Streamingdaten mit dem folgenden Befehl in die neu erstellte Tabelle:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Wichtig
Aufgrund eines bekannten Problems in Apache Spark müssen die Optionen
metastoreUri
unddatabase
derzeit manuell festgelegt werden. Weitere Informationen zu diesem Problem finden Sie unter SPARK-25460.Kehren Sie zur zweiten SSH-Sitzung zurück, und geben Sie die folgenden Werte ein:
foo HiveSpark bar
Kehren Sie zur ersten SSH-Sitzung zurück, und beachten Sie die kurze Aktivität. Verwenden Sie zum Anzeigen der Daten den folgenden Befehl:
hive.table("stream_table").show()
Drücken Sie STRG+C, um netcat
in der zweiten SSH-Sitzung zu beenden. Verwenden Sie :q
, um spark-shell in der ersten SSH-Sitzung zu beenden.