Operace Apache Sparku podporované konektorem Hive Warehouse ve službě Azure HDInsight
Tento článek ukazuje operace založené na sparku podporované konektorem Hive Warehouse Connector (HWC). Všechny zobrazené příklady se spustí prostřednictvím prostředí Apache Spark.
Požadavek
Dokončete kroky nastavení konektoru Hive Warehouse Connector.
Začínáme
Pokud chcete spustit relaci spark-shellu, proveďte následující kroky:
Pomocí příkazu ssh se připojte ke clusteru Apache Spark. Upravte příkaz nahrazením clusteru názvem clusteru a zadáním příkazu:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
V relaci ssh spusťte následující příkaz, který si poznamenejte
hive-warehouse-connector-assembly
verzi:ls /usr/hdp/current/hive_warehouse_connector
Upravte kód s
hive-warehouse-connector-assembly
verzí uvedenou výše. Pak spuštěním příkazu spusťte prostředí Spark:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Po spuštění spark-shellu můžete instanci konektoru Hive Warehouse začít pomocí následujících příkazů:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Vytváření datových rámců Sparku pomocí dotazů Hive
Výsledky všech dotazů používajících knihovnu HWC se vrátí jako datový rámec. Následující příklady ukazují, jak vytvořit základní dotaz Hive.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Výsledky dotazu jsou datové rámce Sparku, které je možné použít s knihovnami Sparku, jako je MLIB a SparkSQL.
Zápis datových rámců Sparku do tabulek Hive
Spark nativně nepodporuje zápis do spravovaných tabulek ACID Hive. Pomocí HWC ale můžete do tabulky Hive zapisovat libovolný datový rámec. Tuto funkci si můžete prohlédnout v práci v následujícím příkladu:
Vytvořte volanou
sampletable_colorado
tabulku a pomocí následujícího příkazu zadejte její sloupce:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Vyfiltrujte tabulku
hivesampletable
, ve které se sloupecstate
rovnáColorado
. Tento dotaz Hive vrátí datový rámec Sparku a výsledek se uloží do tabulkysampletable_colorado
Hive pomocíwrite
funkce.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Výsledky zobrazíte pomocí následujícího příkazu:
hive.table("sampletable_colorado").show()
Zápisy strukturovaného streamování
Pomocí konektoru Hive Warehouse můžete pomocí streamování Sparku zapisovat data do tabulek Hive.
Důležité
Zápisy strukturovaného streamování se nepodporují v clusterech Spark 4.0 s podporou ESP.
Postupujte podle pokynů k ingestování dat z datového proudu Sparku na portu localhost 9999 do tabulky Hive prostřednictvím. Konektor Hive Warehouse.
V otevřeném prostředí Spark spusťte stream Spark pomocí následujícího příkazu:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Pomocí následujících kroků vygenerujte data pro datový proud Spark, který jste vytvořili:
- Otevřete druhou relaci SSH ve stejném clusteru Spark.
- Do příkazového řádku zadejte
nc -lk 9999
. Tento příkaz používánetcat
nástroj k odesílání dat z příkazového řádku do zadaného portu.
Vraťte se do první relace SSH a vytvořte novou tabulku Hive pro uložení streamovaných dat. V prostředí spark-shell zadejte následující příkaz:
hive.createTable("stream_table").column("value","string").create()
Potom pomocí následujícího příkazu zapište streamovaná data do nově vytvořené tabulky:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Důležité
Možnosti
metastoreUri
musídatabase
být v současné době nastavené ručně kvůli známému problému v Apache Sparku. Další informace o tomto problému najdete v tématu SPARK-25460.Vraťte se do druhé relace SSH a zadejte následující hodnoty:
foo HiveSpark bar
Vraťte se do první relace SSH a poznamenejte si krátkou aktivitu. K zobrazení dat použijte následující příkaz:
hive.table("stream_table").show()
K zastavení netcat
druhé relace SSH použijte kombinaci kláves Ctrl+C. Slouží :q
k ukončení spark-shellu v první relaci SSH.