Sdílet prostřednictvím


Operace Apache Sparku podporované konektorem Hive Warehouse ve službě Azure HDInsight

Tento článek ukazuje operace založené na sparku podporované konektorem Hive Warehouse Connector (HWC). Všechny zobrazené příklady se spustí prostřednictvím prostředí Apache Spark.

Požadavek

Dokončete kroky nastavení konektoru Hive Warehouse Connector.

Začínáme

Pokud chcete spustit relaci spark-shellu, proveďte následující kroky:

  1. Pomocí příkazu ssh se připojte ke clusteru Apache Spark. Upravte příkaz nahrazením clusteru názvem clusteru a zadáním příkazu:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. V relaci ssh spusťte následující příkaz, který si poznamenejte hive-warehouse-connector-assembly verzi:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Upravte kód s hive-warehouse-connector-assembly verzí uvedenou výše. Pak spuštěním příkazu spusťte prostředí Spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Po spuštění spark-shellu můžete instanci konektoru Hive Warehouse začít pomocí následujících příkazů:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Vytváření datových rámců Sparku pomocí dotazů Hive

Výsledky všech dotazů používajících knihovnu HWC se vrátí jako datový rámec. Následující příklady ukazují, jak vytvořit základní dotaz Hive.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Výsledky dotazu jsou datové rámce Sparku, které je možné použít s knihovnami Sparku, jako je MLIB a SparkSQL.

Zápis datových rámců Sparku do tabulek Hive

Spark nativně nepodporuje zápis do spravovaných tabulek ACID Hive. Pomocí HWC ale můžete do tabulky Hive zapisovat libovolný datový rámec. Tuto funkci si můžete prohlédnout v práci v následujícím příkladu:

  1. Vytvořte volanou sampletable_colorado tabulku a pomocí následujícího příkazu zadejte její sloupce:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Vyfiltrujte tabulku hivesampletable , ve které se sloupec state rovná Colorado. Tento dotaz Hive vrátí datový rámec Sparku a výsledek se uloží do tabulky sampletable_colorado Hive pomocí write funkce.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Výsledky zobrazíte pomocí následujícího příkazu:

    hive.table("sampletable_colorado").show()
    

    Konektor hive warehouse zobrazuje tabulku Hive.

Zápisy strukturovaného streamování

Pomocí konektoru Hive Warehouse můžete pomocí streamování Sparku zapisovat data do tabulek Hive.

Důležité

Zápisy strukturovaného streamování se nepodporují v clusterech Spark 4.0 s podporou ESP.

Postupujte podle pokynů k ingestování dat z datového proudu Sparku na portu localhost 9999 do tabulky Hive prostřednictvím. Konektor Hive Warehouse.

  1. V otevřeném prostředí Spark spusťte stream Spark pomocí následujícího příkazu:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Pomocí následujících kroků vygenerujte data pro datový proud Spark, který jste vytvořili:

    1. Otevřete druhou relaci SSH ve stejném clusteru Spark.
    2. Do příkazového řádku zadejte nc -lk 9999. Tento příkaz používá netcat nástroj k odesílání dat z příkazového řádku do zadaného portu.
  3. Vraťte se do první relace SSH a vytvořte novou tabulku Hive pro uložení streamovaných dat. V prostředí spark-shell zadejte následující příkaz:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Potom pomocí následujícího příkazu zapište streamovaná data do nově vytvořené tabulky:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Důležité

    Možnosti metastoreUri musí database být v současné době nastavené ručně kvůli známému problému v Apache Sparku. Další informace o tomto problému najdete v tématu SPARK-25460.

  5. Vraťte se do druhé relace SSH a zadejte následující hodnoty:

    foo
    HiveSpark
    bar
    
  6. Vraťte se do první relace SSH a poznamenejte si krátkou aktivitu. K zobrazení dat použijte následující příkaz:

    hive.table("stream_table").show()
    

K zastavení netcat druhé relace SSH použijte kombinaci kláves Ctrl+C. Slouží :q k ukončení spark-shellu v první relaci SSH.

Další kroky