Operazioni di Apache Spark supportate da Hive Warehouse Connector in Azure HDInsight
Questo articolo illustra le operazioni basate su Spark supportate da Hive Warehouse Connector (HWC). Tutti gli esempi mostrati verranno eseguiti tramite la shell apache Spark.
Prerequisito
Completare la procedura di configurazione di Hive Warehouse Connector.
Introduzione
Per avviare una sessione della shell Spark, seguire questa procedura:
Usare il comando ssh per connettersi al cluster Apache Spark. Modificare il comando sostituendo CLUSTERNAME con il nome del cluster e quindi immettere il comando :
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Nella sessione ssh eseguire il comando seguente per prendere nota della versione di
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Modificare il codice con la
hive-warehouse-connector-assembly
versione identificata in precedenza. Eseguire quindi il comando per avviare la shell Spark:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Dopo aver avviato spark-shell, è possibile avviare un'istanza di Hive Warehouse Connector usando i comandi seguenti:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Creazione di DataFrame Spark con query Hive
I risultati di tutte le query che usano la libreria HWC vengono restituiti come DataFrame. Gli esempi seguenti illustrano come creare una query Hive di base.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
I risultati della query sono DataFrame Spark, che possono essere usati con le librerie Spark come MLIB e SparkSQL.
Scrittura di DataFrame Spark nelle tabelle Hive
Spark non supporta in modo nativo la scrittura nelle tabelle ACID gestite di Hive. Tuttavia, usando HWC, è possibile scrivere qualsiasi dataframe in una tabella Hive. È possibile vedere come opera questa funzionalità nell'esempio seguente:
Creare una tabella denominata
sampletable_colorado
e specificarne le colonne usando il comando seguente:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filtrare la tabella
hivesampletable
dove la colonnastate
è uguale aColorado
. Questa query hive restituisce un dataframe Spark e il risultato viene salvato nella tabellasampletable_colorado
Hive usando lawrite
funzione .hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Verificare i risultati con il comando seguente:
hive.table("sampletable_colorado").show()
Operazioni di scrittura di flussi strutturati
Con Hive Warehouse Connector, è possibile usare i flussi Spark per scrivere dati nelle tabelle Hive.
Importante
Le operazioni di scrittura di flussi strutturati non sono supportate nei cluster Spark 4.0 abilitati per ESP.
Seguire la procedura per inserire dati da un flusso Spark sulla porta localhost 9999 in una tabella Hive tramite . Hive Warehouse Connector.
Dalla shell Spark aperta, avviare un flusso Spark con il comando seguente:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Generare i dati per il flusso Spark creato, seguendo questa procedura:
- Aprire una seconda sessione SSH nello stesso cluster Spark.
- Al prompt dei comandi digitare
nc -lk 9999
. Questo comando usa l'utilitànetcat
per inviare dati dalla riga di comando alla porta specificata.
Tornare alla prima sessione SSH e creare una nuova tabella Hive per conservare i dati di flusso. Nella shell Spark immettere il comando seguente:
hive.createTable("stream_table").column("value","string").create()
Scrivere quindi i dati di flusso nella tabella appena creata usando il comando seguente:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Importante
Attualmente è necessario impostare le opzioni
metastoreUri
edatabase
in modo manuale a causa di un problema noto di Apache Spark. Per altre informazioni su questo problema, vedere SPARK-25460.Tornare alla seconda sessione SSH e immettere i valori seguenti:
foo HiveSpark bar
Tornare alla prima sessione SSH e notare la breve attività. Per visualizzare i dati, usare il comando seguente:
hive.table("stream_table").show()
Usare CTRL+C per arrestare netcat
la seconda sessione SSH. Usare :q
per uscire dalla shell Spark nella prima sessione SSH.