Freigeben über


Vom Hive Warehouse Connector unterstützte Apache Spark-Vorgänge in Azure HDInsight

In diesem Artikel werden Spark-basierte Vorgänge erläutert, die vom Hive Warehouse Connector (HWC) unterstützt werden. Alle gezeigten Beispiele werden über die Apache Spark-Shell ausgeführt.

Voraussetzungen

Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.

Erste Schritte

Führen Sie die folgenden Schritte aus, um eine spark-shell-Sitzung zu starten:

  1. Verwenden Sie den Befehl ssh, um eine Verbindung mit Ihrem Apache Spark-Cluster herzustellen. Bearbeiten Sie den Befehl, indem Sie CLUSTERNAME durch den Namen Ihres Clusters ersetzen, und geben Sie den Befehl dann ein:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Führen Sie in Ihrer SSH-Sitzung den folgenden Befehl aus, und notieren Sie sich die Version von hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Bearbeiten Sie den Code, indem Sie die oben ermittelte Version für hive-warehouse-connector-assembly verwenden. Führen Sie anschließend den Befehl aus, um die Spark-Shell zu starten:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Nachdem die Spark-Shell gestartet wurde, kann mit den folgenden Befehlen eine Hive Warehouse Connector-Instanz gestartet werden:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Erstellen von Spark-Datenrahmen mithilfe von Hive-Abfragen

Die Ergebnisse aller Abfragen mit Verwendung der HWC-Bibliothek werden als Datenrahmen zurückgegeben. In den folgenden Beispielen wird veranschaulicht, wie Sie eine einfache Hive-Abfrage erstellen.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Bei den Ergebnissen der Abfrage handelt es sich um Spark-Datenrahmen, die mit Spark-Bibliotheken wie MLIB und SparkSQL verwendet werden können.

Schreiben von Spark-Datenrahmen in Hive-Tabellen

Spark bietet keine native Unterstützung für das Schreiben in verwaltete ACID-Tabellen von Hive. Mithilfe von HWC können Sie jedoch alle Datenrahmen in eine Hive-Tabelle schreiben. Diese Funktionalität wird im folgenden Beispiel veranschaulicht:

  1. Erstellen Sie eine Tabelle mit dem Namen sampletable_colorado, und geben Sie Spalten dafür an, indem Sie den folgenden Befehl verwenden:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filtern Sie die Tabelle hivesampletable, in der die Spalte state den Eintrag Colorado enthält. Diese Hive-Abfrage gibt einen Spark-Datenrahmen zurück, und das Ergebnis wird mit der Funktion write in der Hive-Tabelle sampletable_colorado gespeichert.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Zeigen Sie die Ergebnisse mit dem folgenden Befehl an:

    hive.table("sampletable_colorado").show()
    

    Hive Warehouse Connector zeigt die Hive-Tabelle.

Schreibvorgänge per strukturiertem Stream

Mit Hive Warehouse Connector können Sie das Spark-Streaming nutzen, um Daten in Hive-Tabellen zu schreiben.

Wichtig

Strukturierte Streamingschreibvorgänge werden in ESP-fähigen Spark 4.0-Clustern nicht unterstützt.

Mithilfe der Schritte können Sie Daten aus einem Spark-Stream vom Localhost-Port 9999 abrufen und als Hive-Tabelle formatieren. (über den Hive Warehouse Connector).

  1. Starten Sie in der geöffneten Spark-Shell mit dem folgenden Befehl einen Spark-Stream:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Generieren Sie Daten für den von Ihnen erstellten Spark-Stream, indem Sie die folgenden Schritte ausführen:

    1. Richten Sie im selben Spark-Cluster eine zweite SSH-Sitzung ein.
    2. Geben Sie an der Eingabeaufforderung nc -lk 9999 ein: Bei diesem Befehl wird das Hilfsprogramm netcat verwendet, um Daten über die Befehlszeile an den angegebenen Port zu senden.
  3. Kehren Sie zur ersten SSH-Sitzung zurück, und erstellen Sie für die Streamingdaten eine neue Hive-Tabelle. Geben Sie in spark-shell den folgenden Befehl ein:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Schreiben Sie anschließend die Streamingdaten mit dem folgenden Befehl in die neu erstellte Tabelle:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Wichtig

    Aufgrund eines bekannten Problems in Apache Spark müssen die Optionen metastoreUri und database derzeit manuell festgelegt werden. Weitere Informationen zu diesem Problem finden Sie unter SPARK-25460.

  5. Kehren Sie zur zweiten SSH-Sitzung zurück, und geben Sie die folgenden Werte ein:

    foo
    HiveSpark
    bar
    
  6. Kehren Sie zur ersten SSH-Sitzung zurück, und beachten Sie die kurze Aktivität. Verwenden Sie zum Anzeigen der Daten den folgenden Befehl:

    hive.table("stream_table").show()
    

Drücken Sie STRG+C, um netcat in der zweiten SSH-Sitzung zu beenden. Verwenden Sie :q, um spark-shell in der ersten SSH-Sitzung zu beenden.

Nächste Schritte