Använda Apache Spark för att läsa och skriva Apache HBase-data
Apache HBase efterfrågas vanligtvis antingen med sitt lågnivå-API (genomsökningar, hämtar och placerar) eller med en SQL-syntax med hjälp av Apache Phoenix. Apache tillhandahåller även Apache Spark HBase-Anslut eller. Anslut eller är ett bekvämt och effektivt alternativ för att fråga efter och ändra data som lagras av HBase.
Förutsättningar
Två separata HDInsight-kluster som distribuerats i samma virtuella nätverk. En HBase och en Spark med minst Spark 2.1 (HDInsight 3.6) installerat. Mer information finns i Skapa Linux-baserade kluster i HDInsight med hjälp av Azure-portalen.
URI-schemat för dina klusters primära lagring. Det här schemat skulle vara wasb:// för Azure Blob Storage,
abfs://
för Azure Data Lake Storage Gen2 eller adl:// för Azure Data Lake Storage Gen1. Om säker överföring är aktiverad för Blob Storage blirwasbs://
URI:n . Se även säker överföring.
Övergripande process
Den övergripande processen för att aktivera Spark-klustret för att fråga ditt HBase-kluster är följande:
- Förbered några exempeldata i HBase.
- Hämta hbase-site.xml-filen från konfigurationsmappen för HBase-klustret (/etc/hbase/conf) och placera en kopia av hbase-site.xml i konfigurationsmappen för Spark 2 (/etc/spark2/conf). (VALFRITT: använd skript som tillhandahålls av HDInsight-teamet för att automatisera den här processen)
- Kör
spark-shell
och referera till Spark HBase-Anslut eller med dess Maven-koordinater i alternativetpackages
. - Definiera en katalog som mappar schemat från Spark till HBase.
- Interagera med HBase-data med rdd- eller DataFrame-API:er.
Förbereda exempeldata i Apache HBase
I det här steget skapar och fyller du i en tabell i Apache HBase som du sedan kan köra frågor mot med Spark.
ssh
Använd kommandot för att ansluta till ditt HBase-kluster. Redigera kommandot genom attHBASECLUSTER
ersätta med namnet på ditt HBase-kluster och ange sedan kommandot:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
hbase shell
Använd kommandot för att starta det interaktiva HBase-gränssnittet. Ange följande kommando i din SSH-anslutning:hbase shell
create
Använd kommandot för att skapa en HBase-tabell med tvåkolumnsfamiljer. Ange följande kommando:create 'Contacts', 'Personal', 'Office'
put
Använd kommandot för att infoga värden i en angiven kolumn på en angiven rad i en viss tabell. Ange följande kommando:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
exit
Använd kommandot för att stoppa det interaktiva HBase-gränssnittet. Ange följande kommando:exit
Kör skript för att konfigurera anslutning mellan kluster
Om du vill konfigurera kommunikationen mellan kluster följer du stegen för att köra två skript i dina kluster. Dessa skript automatiserar processen för filkopiering som beskrivs i avsnittet Konfigurera kommunikation manuellt.
- Skriptet som du kör från HBase-klustret laddar upp
hbase-site.xml
och HBase IP-mappningsinformation till standardlagringen som är kopplad till ditt Spark-kluster. - Skriptet som du kör från Spark-klustret konfigurerar två cron-jobb för att köra två hjälpskript med jämna mellanrum:
- HBase cron-jobb – ladda ned nya
hbase-site.xml
filer och HBase IP-mappning från Spark-standardlagringskontot till den lokala noden - Spark cron-jobb – kontrollerar om en Spark-skalning har inträffat och om klustret är säkert. I så fall redigerar
/etc/hosts
du för att inkludera HBase IP-mappning som lagras lokalt
- HBase cron-jobb – ladda ned nya
Obs! Kontrollera att du har lagt till Spark-klustrets lagringskonto i HBase-klustret som ett sekundärt lagringskonto innan du fortsätter. Kontrollera att skripten är i ordning enligt angiven ordning.
Använd Skriptåtgärd i ditt HBase-kluster för att tillämpa ändringarna med följande överväganden:
Property Värde Bash-skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Nodtyper Region Parametrar -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Framhärdade ja SECONDARYS_STORAGE_URL
är url:en för Standardlagring på Spark-sidan. Parameterexempel:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Använd Skriptåtgärd i Spark-klustret för att tillämpa ändringarna med följande överväganden:
Property Värde Bash-skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Nodtyper Chef, Arbetare, Zookeeper Parametrar -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Framhärdade ja - Du kan ange hur ofta du vill att det här klustret ska kontrollera om det uppdateras automatiskt. Standard: -s "*/1 * * * *" -h 0 (I det här exemplet körs Spark cron varje minut, medan HBase-cronen inte körs)
- Eftersom HBase cron inte har konfigurerats som standard måste du köra skriptet igen när du utför skalning till ditt HBase-kluster. Om ditt HBase-kluster skalas ofta kan du välja att konfigurera HBase cron-jobb automatiskt. Till exempel:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
konfigurerar skriptet för att utföra kontroller var 30:e minut. Detta kör HBase cron-schema regelbundet för att automatisera nedladdningen av ny HBase-information på det gemensamma lagringskontot till den lokala noden.
Kommentar
Dessa skript fungerar endast på HDI 5.0- och HDI 5.1-kluster.
Konfigurera kommunikation manuellt (valfritt, om det angivna skriptet i ovanstående steg misslyckas)
Obs! Dessa steg måste utföras varje gång ett av klustren genomgår en skalningsaktivitet.
Kopiera hbase-site.xml från lokal lagring till roten för Spark-klustrets standardlagring. Redigera kommandot för att återspegla konfigurationen. Ange sedan kommandot från den öppna SSH-sessionen till HBase-klustret:
Syntaxvärde Nytt värde URI-schema Ändra för att återspegla lagringen. Syntaxen är för bloblagring med säker överföring aktiverad. SPARK_STORAGE_CONTAINER
Ersätt med standardnamnet för lagringscontainern som används för Spark-klustret. SPARK_STORAGE_ACCOUNT
Ersätt med standardnamnet för lagringskontot som används för Spark-klustret. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Avsluta sedan ssh-anslutningen till HBase-klustret.
exit
Anslut till huvudnoden i spark-klustret med hjälp av SSH. Redigera kommandot genom att
SPARKCLUSTER
ersätta med namnet på ditt Spark-kluster och ange sedan kommandot:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Ange kommandot för att kopiera
hbase-site.xml
från Spark-klustrets standardlagring till Spark 2-konfigurationsmappen på klustrets lokala lagring:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Kör Spark Shell som refererar till Spark HBase-Anslut eller
När du har slutfört föregående steg bör du kunna köra Spark-gränssnittet och referera till lämplig version av Spark HBase-Anslut eller.
I följande tabell visas till exempel två versioner och motsvarande kommandon som HDInsight-teamet använder för närvarande. Du kan använda samma versioner för dina kluster om versionerna av HBase och Spark är samma som i tabellen.
I den öppna SSH-sessionen till Spark-klustret anger du följande kommando för att starta ett Spark-gränssnitt:
Spark-version HDI HBase-version SHC-version Command 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Håll den här Spark Shell-instansen öppen och fortsätt att definiera en katalog och fråga. Om du inte hittar jar-filerna som motsvarar dina versioner på SHC Core-lagringsplatsen fortsätter du att läsa.
För efterföljande kombinationer av Spark- och HBase-versioner publiceras inte längre dessa artefakter på lagringsplatsen ovan. Du kan skapa jar-filerna direkt från GitHub-grenen spark-hbase-connector . Om du till exempel kör med Spark 2.4 och HBase 2.1 utför du följande steg:
Klona lagringsplatsen:
git clone https://github.com/hortonworks-spark/shc
Gå till branch-2.4:
git checkout branch-2.4
Skapa från grenen (skapar en .jar-fil):
mvn clean package -DskipTests
Kör följande kommando (se till att ändra det .jar namn som motsvarar den .jar fil som du skapade):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Håll den här Spark Shell-instansen öppen och fortsätt till nästa avsnitt.
Definiera en katalog och fråga
I det här steget definierar du ett katalogobjekt som mappar schemat från Apache Spark till Apache HBase.
I ditt öppna Spark Shell anger du följande
import
instruktioner:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Ange kommandot nedan för att definiera en katalog för tabellen Kontakter som du skapade i HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Koden:
- Definierar ett katalogschema för HBase-tabellen med namnet
Contacts
. - Identifierar radnyckeln som
key
och mappar kolumnnamnen som används i Spark till kolumnfamiljen, kolumnnamnet och kolumntypen som används i HBase. - Definierar radnyckeln i detalj som en namngiven kolumn (
rowkey
), som har en specifik kolumnfamiljcf
medrowkey
.
- Definierar ett katalogschema för HBase-tabellen med namnet
Ange kommandot för att definiera en metod som tillhandahåller en DataFrame runt tabellen
Contacts
i HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Skapa en instans av DataFrame:
val df = withCatalog(catalog)
Fråga DataFrame:
df.show()
Du bör se två rader med data:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Registrera en tillfällig tabell så att du kan köra frågor mot HBase-tabellen med Spark SQL:
df.createTempView("contacts")
Utfärda en SQL-fråga mot
contacts
tabellen:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Du bör se resultat som dessa:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Infoga nya data
Om du vill infoga en ny kontaktpost definierar du en
ContactRecord
klass:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Skapa en instans av
ContactRecord
och placera den i en matris:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Spara matrisen med nya data till HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Granska resultaten:
df.show()
Du bör se utdata som ser ut så här:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Stäng Spark-gränssnittet genom att ange följande kommando:
:q