Udostępnij za pośrednictwem


Operacje platformy Apache Spark obsługiwane przez łącznik magazynu Hive w usłudze Azure HDInsight

W tym artykule przedstawiono operacje oparte na platformie Spark obsługiwane przez łącznik magazynu Hive (HWC). Wszystkie pokazane przykłady zostaną wykonane za pomocą powłoki Apache Spark.

Warunek wstępny

Wykonaj kroki konfiguracji łącznika magazynu Hive.

Wprowadzenie

Aby rozpocząć sesję spark-shell, wykonaj następujące czynności:

  1. Użyj polecenia ssh, aby nawiązać połączenie z klastrem Apache Spark. Zmodyfikuj polecenie, zastępując ciąg CLUSTERNAME nazwą klastra, a następnie wprowadź polecenie:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. W sesji SSH wykonaj następujące polecenie, aby zanotować hive-warehouse-connector-assembly wersję:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Zmodyfikuj kod z wersją zidentyfikowaną hive-warehouse-connector-assembly powyżej. Następnie wykonaj polecenie , aby uruchomić powłokę spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Po uruchomieniu powłoki spark można uruchomić wystąpienie łącznika usługi Hive Warehouse przy użyciu następujących poleceń:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Tworzenie ramek danych platformy Spark przy użyciu zapytań Hive

Wyniki wszystkich zapytań korzystających z biblioteki HWC są zwracane jako ramka danych. W poniższych przykładach pokazano, jak utworzyć podstawowe zapytanie hive.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Wyniki zapytania to ramki danych platformy Spark, których można używać z bibliotekami spark, takimi jak MLIB i SparkSQL.

Zapisywanie ramek danych platformy Spark w tabelach Hive

Platforma Spark nie obsługuje natywnie zapisywania w zarządzanych tabelach ACID programu Hive. Jednak przy użyciu HWC można zapisać dowolną ramkę danych w tabeli Programu Hive. Ta funkcja działa w następującym przykładzie:

  1. Utwórz tabelę o nazwie sampletable_colorado i określ jej kolumny przy użyciu następującego polecenia:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Przefiltruj tabelę hivesampletable , w której kolumna state ma Coloradowartość . To zapytanie hive zwraca ramkę danych platformy Spark i wynik jest zapisywany w tabeli sampletable_colorado Hive przy użyciu write funkcji .

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Wyświetl wyniki za pomocą następującego polecenia:

    hive.table("sampletable_colorado").show()
    

    Łącznik magazynu hive pokazuje tabelę hive.

Zapisy przesyłania strumieniowego ze strukturą

Za pomocą łącznika magazynu Hive możesz użyć przesyłania strumieniowego platformy Spark do zapisywania danych w tabelach hive.

Ważne

Zapisy przesyłania strumieniowego ze strukturą nie są obsługiwane w klastrach Spark 4.0 z obsługą esp.

Postępuj zgodnie z instrukcjami, aby pozyskiwać dane ze strumienia Spark na porcie localhost 9999 do tabeli Programu Hive za pośrednictwem. Łącznik magazynu Hive.

  1. Z poziomu otwartej powłoki Spark rozpocznij strumień spark za pomocą następującego polecenia:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Wygeneruj dane dla utworzonego strumienia Spark, wykonując następujące czynności:

    1. Otwórz drugą sesję SSH w tym samym klastrze Spark.
    2. W wierszu polecenia wpisz nc -lk 9999. To polecenie używa netcat narzędzia do wysyłania danych z wiersza polecenia do określonego portu.
  3. Wróć do pierwszej sesji SSH i utwórz nową tabelę programu Hive w celu przechowywania danych przesyłanych strumieniowo. W powłoce spark wprowadź następujące polecenie:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Następnie zapisz dane przesyłane strumieniowo do nowo utworzonej tabeli przy użyciu następującego polecenia:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Ważne

    Opcje metastoreUri i database muszą być obecnie ustawione ręcznie z powodu znanego problemu na platformie Apache Spark. Aby uzyskać więcej informacji na temat tego problemu, zobacz SPARK-25460.

  5. Wróć do drugiej sesji SSH i wprowadź następujące wartości:

    foo
    HiveSpark
    bar
    
  6. Wróć do pierwszej sesji SSH i zanotuj krótkie działanie. Użyj następującego polecenia, aby wyświetlić dane:

    hive.table("stream_table").show()
    

Naciśnij Ctrl + C , aby zatrzymać netcat się w drugiej sesji SSH. Użyj polecenia :q , aby zamknąć powłokę spark w pierwszej sesji SSH.

Następne kroki