Opérations d’Apache Spark prises en charge par Hive Warehouse Connector dans Azure HDInsight
Cet article présente les opérations basées sur Spark prises en charge par Hive Warehouse Connector (HWC). Tous les exemples présentés seront exécutés via l’interpréteur de commandes Apache Spark.
Configuration requise
Suivez les étapes de Configuration de Hive Warehouse Connector.
Prise en main
Pour démarrer une session spark-shell, procédez comme suit :
Utilisez une commande ssh pour vous connecter à votre cluster Apache Spark. Modifiez la commande en remplaçant CLUSTERNAME par le nom de votre cluster, puis entrez la commande :
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
À partir de votre session ssh, exécutez la commande suivante pour noter la version de
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Modifiez le code avec la version
hive-warehouse-connector-assembly
identifiée ci-dessus. Exécutez ensuite la commande pour démarrer l’interpréteur de commandes Spark :spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Après avoir démarré l’interpréteur de commandes Spark, une instance Hive Warehouse Connector peut être démarrée à l’aide des commandes suivantes :
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Création de DataFrames Spark à partir de requêtes Hive
Les résultats de toutes les requêtes utilisant la bibliothèque HWC sont renvoyés sous forme de DataFrame. Les exemples suivants montrent comment créer une requête Hive de base.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Les résultats de la requête sont des DataFrames Spark, qui peuvent être utilisés avec les bibliothèques Spark comme MLIB et SparkSQL.
Écriture de DataFrames Spark dans des tables Hive
Spark ne prend pas en charge en mode natif l’écriture dans des tables ACID managées de Hive. HWC vous permet cependant d’écrire n’importe quel DataFrame dans une table Hive. Vous pouvez voir cette fonctionnalité en action dans l'exemple suivant :
Créez une table appelée
sampletable_colorado
et spécifiez ses colonnes en utilisant la commande suivante :hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filtrez la table
hivesampletable
, où la colonnestate
est égale àColorado
. Cette requête Hive retourne un DataFrame Spark et le résultat est enregistré dans la table Hivesampletable_colorado
à l’aide de la fonctionwrite
.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Visualisez les résultats avec la commande suivante :
hive.table("sampletable_colorado").show()
Écritures à l’aide du streaming structuré
Le connecteur d’entrepôt Hive vous permet d’utiliser le streaming Spark pour écrire des données dans des tables Hive.
Important
Les écritures de streaming structuré ne sont pas prises en charge dans les clusters Spark 4.0 à extension ESP.
Suivez les étapes pour ingérer les données d’un flux Spark sur le port localhost 9999 dans une table Hive via. Hive Warehouse Connector.
À partir de votre interpréteur de commandes Spark ouvert, démarrez un flux Spark à l’aide de la commande suivante :
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Procédez comme suit pour générer les données du flux Spark que vous avez créé :
- Ouvrez une deuxième session SSH sur le même cluster Spark.
- À l’invite de commandes, tapez
nc -lk 9999
. Cette commande utilise l’utilitairenetcat
pour envoyer des données de la ligne de commande au port spécifié.
Revenez à la première session SSH et créez une nouvelle table Hive pour contenir les données de streaming. Dans l’interpréteur de commandes de Spark, entrez la commande suivante :
hive.createTable("stream_table").column("value","string").create()
Puis, écrivez les données de streaming dans la table nouvellement créée à l’aide de la commande suivante :
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Important
Les options
metastoreUri
etdatabase
doivent actuellement être définies manuellement en raison d'un problème connu dans Apache Spark. Pour plus d'informations sur ce problème, voir SPARK-25460.Revenez à la deuxième session SSH et entrez les valeurs suivantes :
foo HiveSpark bar
Revenez à la première session SSH et notez la brève activité. Pour afficher les données, utilisez la commande suivante :
hive.table("stream_table").show()
Utilisez Ctrl + C pour arrêter netcat
sur la deuxième session SSH. Utilisez :q
pour quitter l’interpréteur de commandes de Spark sur la première session SSH.