Operacje platformy Apache Spark obsługiwane przez łącznik magazynu Hive w usłudze Azure HDInsight
W tym artykule przedstawiono operacje oparte na platformie Spark obsługiwane przez łącznik magazynu Hive (HWC). Wszystkie pokazane przykłady zostaną wykonane za pomocą powłoki Apache Spark.
Warunek wstępny
Wykonaj kroki konfiguracji łącznika magazynu Hive.
Wprowadzenie
Aby rozpocząć sesję spark-shell, wykonaj następujące czynności:
Użyj polecenia ssh, aby nawiązać połączenie z klastrem Apache Spark. Zmodyfikuj polecenie, zastępując ciąg CLUSTERNAME nazwą klastra, a następnie wprowadź polecenie:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
W sesji SSH wykonaj następujące polecenie, aby zanotować
hive-warehouse-connector-assembly
wersję:ls /usr/hdp/current/hive_warehouse_connector
Zmodyfikuj kod z wersją zidentyfikowaną
hive-warehouse-connector-assembly
powyżej. Następnie wykonaj polecenie , aby uruchomić powłokę spark:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Po uruchomieniu powłoki spark można uruchomić wystąpienie łącznika usługi Hive Warehouse przy użyciu następujących poleceń:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Tworzenie ramek danych platformy Spark przy użyciu zapytań Hive
Wyniki wszystkich zapytań korzystających z biblioteki HWC są zwracane jako ramka danych. W poniższych przykładach pokazano, jak utworzyć podstawowe zapytanie hive.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Wyniki zapytania to ramki danych platformy Spark, których można używać z bibliotekami spark, takimi jak MLIB i SparkSQL.
Zapisywanie ramek danych platformy Spark w tabelach Hive
Platforma Spark nie obsługuje natywnie zapisywania w zarządzanych tabelach ACID programu Hive. Jednak przy użyciu HWC można zapisać dowolną ramkę danych w tabeli Programu Hive. Ta funkcja działa w następującym przykładzie:
Utwórz tabelę o nazwie
sampletable_colorado
i określ jej kolumny przy użyciu następującego polecenia:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Przefiltruj tabelę
hivesampletable
, w której kolumnastate
maColorado
wartość . To zapytanie hive zwraca ramkę danych platformy Spark i wynik jest zapisywany w tabelisampletable_colorado
Hive przy użyciuwrite
funkcji .hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Wyświetl wyniki za pomocą następującego polecenia:
hive.table("sampletable_colorado").show()
Zapisy przesyłania strumieniowego ze strukturą
Za pomocą łącznika magazynu Hive możesz użyć przesyłania strumieniowego platformy Spark do zapisywania danych w tabelach hive.
Ważne
Zapisy przesyłania strumieniowego ze strukturą nie są obsługiwane w klastrach Spark 4.0 z obsługą esp.
Postępuj zgodnie z instrukcjami, aby pozyskiwać dane ze strumienia Spark na porcie localhost 9999 do tabeli Programu Hive za pośrednictwem. Łącznik magazynu Hive.
Z poziomu otwartej powłoki Spark rozpocznij strumień spark za pomocą następującego polecenia:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Wygeneruj dane dla utworzonego strumienia Spark, wykonując następujące czynności:
- Otwórz drugą sesję SSH w tym samym klastrze Spark.
- W wierszu polecenia wpisz
nc -lk 9999
. To polecenie używanetcat
narzędzia do wysyłania danych z wiersza polecenia do określonego portu.
Wróć do pierwszej sesji SSH i utwórz nową tabelę programu Hive w celu przechowywania danych przesyłanych strumieniowo. W powłoce spark wprowadź następujące polecenie:
hive.createTable("stream_table").column("value","string").create()
Następnie zapisz dane przesyłane strumieniowo do nowo utworzonej tabeli przy użyciu następującego polecenia:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Ważne
Opcje
metastoreUri
idatabase
muszą być obecnie ustawione ręcznie z powodu znanego problemu na platformie Apache Spark. Aby uzyskać więcej informacji na temat tego problemu, zobacz SPARK-25460.Wróć do drugiej sesji SSH i wprowadź następujące wartości:
foo HiveSpark bar
Wróć do pierwszej sesji SSH i zanotuj krótkie działanie. Użyj następującego polecenia, aby wyświetlić dane:
hive.table("stream_table").show()
Naciśnij Ctrl + C , aby zatrzymać netcat
się w drugiej sesji SSH. Użyj polecenia :q
, aby zamknąć powłokę spark w pierwszej sesji SSH.