Операции Apache Spark, поддерживаемые Hive Warehouse Connector в Azure HDInsight
В этой статье показаны операции на основе Spark, поддерживаемые Hive Warehouse Connector (HWC). Все приведенные примеры будут выполняться с помощью оболочки Apache Spark.
Предварительные требования
Выполните инструкции из статьи Настройка Hive Warehouse Connector.
Начало работы
Чтобы начать сеанс spark-shell, выполните указанные ниже действия.
С помощью команды ssh command подключитесь к кластеру Apache Spark. Измените команду, заменив CLUSTERNAME именем кластера, а затем введите команду:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
В сеансе SSH выполните следующую команду, чтобы отметить версию
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Измените код с помощью указанной
hive-warehouse-connector-assembly
выше версии. Затем выполните команду, чтобы запустить оболочку Spark:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
После запуска spark-shell экземпляр соединителя Хранилища Hive можно запустить с помощью следующих команд:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Создание кадров данных Spark с помощью запросов Hive
Результаты всех запросов, использующих библиотеку HWC, возвращаются в виде кадров данных. В следующих примерах показано, как создать базовый запрос Hive.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Результаты запроса представляют собой фрагменты данных Spark, которые можно использовать с библиотеками Spark, такими как MLIB и SparkSQL.
Запись кадров данных Spark в таблицы Hive
Spark изначально не поддерживает запись в управляемые Hive таблицы ACID. Однако с помощью HWC можно записать в таблицу Hive любой кадр данных. Эта возможность демонстрируется в следующем примере.
Создайте таблицу с именем
sampletable_colorado
и укажите ее столбцы с помощью следующей команды:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Отфильтруйте таблицу
hivesampletable
, где столбецstate
равенColorado
. Этот запрос Hive возвращает кадр данных Spark, который сохраняется в таблице Hivesampletable_colorado
с помощью функцииwrite
.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Просмотрите результаты с помощью следующей команды:
hive.table("sampletable_colorado").show()
Операции записи посредством структурированной потоковой передачи
С помощью Hive Warehouse Connector можно использовать потоковую передачу Spark для записи данных в таблицы Hive.
Внимание
Операции записи посредством структурированной потоковой передачи не поддерживаются в кластерах Spark 4.0 с поддержкой ESP.
Выполните действия по приему данных из потока Spark через порт localhost 9999 в таблицу Hive. выполните указанные ниже действия.
В открытой оболочке Spark запустите поток Spark с помощью следующей команды:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Создайте данные для созданного потока Spark, выполнив указанные ниже действия.
- Откройте второй сеанс SSH в том же кластере Spark.
- В командной строке введите
nc -lk 9999
. Эта команда использует служебную программуnetcat
для отправки данных из командной строки на указанный порт.
Вернитесь к первому сеансу SSH и создайте таблицу Hive для хранения данных потоковой передачи. В оболочке Spark введите следующую команду:
hive.createTable("stream_table").column("value","string").create()
Затем запишите данные потоковой передачи в созданную таблицу с помощью следующей команды:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Внимание
В настоящее время параметры
metastoreUri
иdatabase
должны задаваться вручную из-за известной проблемы в Apache Spark. Дополнительные сведения об этой проблеме см. на странице SPARK-25460.Вернитесь ко второму сеансу SSH и введите следующие значения:
foo HiveSpark bar
Вернитесь к первому сеансу SSH и обратите внимание на кратковременные действия. Для просмотра данных используйте следующую команду:
hive.table("stream_table").show()
Нажмите клавиши CTRL+C, чтобы остановить работу netcat
во втором сеансе SSH. Используйте :q
, чтобы выйти из оболочки Spark в первом сеансе SSH.
Следующие шаги
- Интеграция Hive Warehouse Connector с Apache Spark и Apache Hive
- Use Interactive Query with HDInsight (Использование Interactive Query в HDInsight)
- Интеграция HWC с Apache Zeppelin
- API-интерфейсы, поддерживаемые HWC