Operações do Apache Spark suportadas pelo Hive Warehouse Connector no Azure HDInsight
Este artigo mostra operações baseadas em faísca suportadas pelo Hive Warehouse Connector (HWC). Todos os exemplos mostrados serão executados através do shell do Apache Spark.
Pré-requisito
Conclua as etapas de configuração do Hive Warehouse Connector.
Introdução
Para iniciar uma sessão de spark-shell, execute as seguintes etapas:
Use o comando ssh para se conectar ao cluster do Apache Spark. Edite o comando substituindo CLUSTERNAME pelo nome do cluster e digite o comando:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Na sessão ssh, execute o seguinte comando para anotar a
hive-warehouse-connector-assembly
versão:ls /usr/hdp/current/hive_warehouse_connector
Edite o código com a versão identificada
hive-warehouse-connector-assembly
acima. Em seguida, execute o comando para iniciar o shell de faísca:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Depois de iniciar o spark-shell, uma instância do Hive Warehouse Connector pode ser iniciada usando os seguintes comandos:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Criando DataFrames do Spark usando consultas do Hive
Os resultados de todas as consultas usando a biblioteca HWC são retornados como um DataFrame. Os exemplos a seguir demonstram como criar uma consulta de hive básica.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Os resultados da consulta são Spark DataFrames, que podem ser usados com bibliotecas Spark como MLIB e SparkSQL.
Escrevendo DataFrames do Spark em tabelas do Hive
O Spark não suporta nativamente a gravação nas tabelas ACID gerenciadas do Hive. No entanto, usando HWC, você pode gravar qualquer DataFrame em uma tabela Hive. Você pode ver essa funcionalidade em ação no exemplo a seguir:
Crie uma tabela chamada
sampletable_colorado
e especifique suas colunas usando o seguinte comando:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filtre a tabela
hivesampletable
onde a colunastate
éColorado
igual a . Essa consulta de hive retorna um Spark DataFrame e o resultado é salvo na tabelasampletable_colorado
Hive usando awrite
função.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Veja os resultados com o seguinte comando:
hive.table("sampletable_colorado").show()
Gravações de streaming estruturadas
Usando o Hive Warehouse Connector, você pode usar o streaming do Spark para gravar dados em tabelas do Hive.
Importante
Não há suporte para gravações de streaming estruturadas em clusters Spark 4.0 habilitados para ESP.
Siga as etapas para ingerir dados de um fluxo do Spark na porta localhost 9999 em uma tabela Hive via. Conector de armazém do Hive.
A partir do shell aberto do Spark, inicie um fluxo de faísca com o seguinte comando:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Gere dados para o fluxo do Spark que você criou, executando as seguintes etapas:
- Abra uma segunda sessão SSH no mesmo cluster do Spark.
- No prompt de comando, digite
nc -lk 9999
. Este comando usa onetcat
utilitário para enviar dados da linha de comando para a porta especificada.
Retorne à primeira sessão SSH e crie uma nova tabela do Hive para armazenar os dados de streaming. No spark-shell, digite o seguinte comando:
hive.createTable("stream_table").column("value","string").create()
Em seguida, escreva os dados de streaming na tabela recém-criada usando o seguinte comando:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Importante
As
metastoreUri
opções edatabase
devem ser definidas manualmente devido a um problema conhecido no Apache Spark. Para obter mais informações sobre esse problema, consulte SPARK-25460.Retorne à segunda sessão SSH e insira os seguintes valores:
foo HiveSpark bar
Volte para a primeira sessão SSH e anote a breve atividade. Use o seguinte comando para exibir os dados:
hive.table("stream_table").show()
Use Ctrl + C para parar netcat
na segunda sessão SSH. Use :q
para sair do spark-shell na primeira sessão SSH.