Поделиться через


Операции Apache Spark, поддерживаемые Hive Warehouse Connector в Azure HDInsight

В этой статье показаны операции на основе Spark, поддерживаемые Hive Warehouse Connector (HWC). Все приведенные примеры будут выполняться с помощью оболочки Apache Spark.

Предварительные требования

Выполните инструкции из статьи Настройка Hive Warehouse Connector.

Начало работы

Чтобы начать сеанс spark-shell, выполните указанные ниже действия.

  1. С помощью команды ssh command подключитесь к кластеру Apache Spark. Измените команду, заменив CLUSTERNAME именем кластера, а затем введите команду:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. В сеансе SSH выполните следующую команду, чтобы отметить версию hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Измените код с помощью указанной hive-warehouse-connector-assembly выше версии. Затем выполните команду, чтобы запустить оболочку Spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. После запуска spark-shell экземпляр соединителя Хранилища Hive можно запустить с помощью следующих команд:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Создание кадров данных Spark с помощью запросов Hive

Результаты всех запросов, использующих библиотеку HWC, возвращаются в виде кадров данных. В следующих примерах показано, как создать базовый запрос Hive.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Результаты запроса представляют собой фрагменты данных Spark, которые можно использовать с библиотеками Spark, такими как MLIB и SparkSQL.

Запись кадров данных Spark в таблицы Hive

Spark изначально не поддерживает запись в управляемые Hive таблицы ACID. Однако с помощью HWC можно записать в таблицу Hive любой кадр данных. Эта возможность демонстрируется в следующем примере.

  1. Создайте таблицу с именем sampletable_colorado и укажите ее столбцы с помощью следующей команды:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Отфильтруйте таблицу hivesampletable, где столбец state равен Colorado. Этот запрос Hive возвращает кадр данных Spark, который сохраняется в таблице Hive sampletable_colorado с помощью функции write.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Просмотрите результаты с помощью следующей команды:

    hive.table("sampletable_colorado").show()
    

    Соединитель хранилища hive отображает таблицу hive.

Операции записи посредством структурированной потоковой передачи

С помощью Hive Warehouse Connector можно использовать потоковую передачу Spark для записи данных в таблицы Hive.

Внимание

Операции записи посредством структурированной потоковой передачи не поддерживаются в кластерах Spark 4.0 с поддержкой ESP.

Выполните действия по приему данных из потока Spark через порт localhost 9999 в таблицу Hive. выполните указанные ниже действия.

  1. В открытой оболочке Spark запустите поток Spark с помощью следующей команды:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Создайте данные для созданного потока Spark, выполнив указанные ниже действия.

    1. Откройте второй сеанс SSH в том же кластере Spark.
    2. В командной строке введите nc -lk 9999. Эта команда использует служебную программу netcat для отправки данных из командной строки на указанный порт.
  3. Вернитесь к первому сеансу SSH и создайте таблицу Hive для хранения данных потоковой передачи. В оболочке Spark введите следующую команду:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Затем запишите данные потоковой передачи в созданную таблицу с помощью следующей команды:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Внимание

    В настоящее время параметры metastoreUri и database должны задаваться вручную из-за известной проблемы в Apache Spark. Дополнительные сведения об этой проблеме см. на странице SPARK-25460.

  5. Вернитесь ко второму сеансу SSH и введите следующие значения:

    foo
    HiveSpark
    bar
    
  6. Вернитесь к первому сеансу SSH и обратите внимание на кратковременные действия. Для просмотра данных используйте следующую команду:

    hive.table("stream_table").show()
    

Нажмите клавиши CTRL+C, чтобы остановить работу netcat во втором сеансе SSH. Используйте :q, чтобы выйти из оболочки Spark в первом сеансе SSH.

Следующие шаги