Použití Apache Sparku ke čtení a zápisu dat Apache HBase
Apache HBase se obvykle dotazuje pomocí rozhraní API nízké úrovně (prohledávání, získání a vkládání) nebo pomocí syntaxe SQL pomocí Apache Phoenixu. Apache také poskytuje Připojení or Apache Spark HBase. Připojení or je praktickou a efektivní alternativou k dotazování a úpravě dat uložených HBase.
Požadavky
Dva samostatné clustery HDInsight nasazené ve stejné virtuální síti. Jeden HBase a jeden Spark s nainstalovaným alespoň Sparkem 2.1 (HDInsight 3.6). Další informace najdete v tématu Vytváření clusterů založených na Linuxu ve službě HDInsight pomocí webu Azure Portal.
Schéma identifikátoru URI pro primární úložiště clusterů. Toto schéma by bylo wasb:// pro Azure Blob Storage,
abfs://
pro Azure Data Lake Storage Gen2 nebo adl:// pro Azure Data Lake Storage Gen1. Pokud je pro Blob Storage povolený zabezpečený přenos, identifikátor URI by bylwasbs://
. Viz také zabezpečený přenos.
Celkový proces
Proces vysoké úrovně pro povolení dotazování clusteru Spark na cluster HBase je následující:
- Připravte ukázková data v HBase.
- Získejte soubor hbase-site.xml ze složky konfigurace clusteru HBase (/etc/hbase/conf) a vložte kopii hbase-site.xml do konfigurační složky Sparku 2 (/etc/spark2/conf). (VOLITELNÉ: Použití skriptu poskytovaného týmem HDInsight k automatizaci tohoto procesu)
- Spusťte
spark-shell
odkazování na Spark HBase Připojení or podle souřadnic Mavenupackages
v možnosti. - Definujte katalog, který mapuje schéma ze Sparku na HBase.
- Interakce s daty HBase pomocí rozhraní API rdD nebo datového rámce
Příprava ukázkových dat v Apache HBase
V tomto kroku vytvoříte a naplníte tabulku v Apache HBase, kterou pak můžete dotazovat pomocí Sparku.
ssh
Pomocí příkazu se připojte ke clusteru HBase. Upravte příkaz nahrazenímHBASECLUSTER
názvu clusteru HBase a pak zadejte příkaz:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
hbase shell
Pomocí příkazu spusťte interaktivní prostředí HBase. Do připojení SSH zadejte následující příkaz:hbase shell
create
Pomocí příkazu vytvořte tabulku HBase se dvěma sloupci. Zadejte tento příkaz:create 'Contacts', 'Personal', 'Office'
put
Pomocí příkazu vložte hodnoty do zadaného sloupce v zadaném řádku v konkrétní tabulce. Zadejte tento příkaz:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
exit
Pomocí příkazu zastavte interaktivní prostředí HBase. Zadejte tento příkaz:exit
Spuštění skriptů pro nastavení připojení mezi clustery
Pokud chcete nastavit komunikaci mezi clustery, postupujte podle kroků pro spuštění dvou skriptů v clusterech. Tyto skripty automatizují proces kopírování souborů popsaný v části Nastavení komunikace ručně.
- Skript, který spustíte z clusteru HBase, nahraje
hbase-site.xml
informace o mapování ip adres HBase do výchozího úložiště připojeného k vašemu clusteru Spark. - Skript, který spustíte z clusteru Spark, nastaví dvě úlohy cron, aby pravidelně spouštějí dva pomocné skripty:
- Úloha HBase cron – stažení nových
hbase-site.xml
souborů a mapování IP adres HBase z výchozího účtu úložiště Sparku na místní uzel - Úloha Spark cron – kontroluje, jestli došlo ke škálování Sparku a jestli je cluster zabezpečený. Pokud ano, upravte
/etc/hosts
mapování IP adres HBase uložené místně.
- Úloha HBase cron – stažení nových
POZNÁMKA: Než budete pokračovat, ujistěte se, že jste do clusteru HBase přidali účet úložiště clusteru Spark jako sekundární účet úložiště. Ujistěte se, že jsou skripty v uvedeném pořadí.
Pomocí akce skriptu v clusteru HBase použijte změny s následujícími aspekty:
Vlastnost Hodnota Identifikátor URI skriptu Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Typy uzlů Oblast Parametry -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Trvalé ano SECONDARYS_STORAGE_URL
je adresa URL výchozího úložiště na straně Sparku. Příklad parametru:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Pomocí akce skriptu v clusteru Spark použijte změny s následujícími aspekty:
Vlastnost Hodnota Identifikátor URI skriptu Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Typy uzlů Vedoucí, Pracovník, Zookeeper Parametry -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Trvalé ano - Můžete určit, jak často má tento cluster automaticky kontrolovat, jestli se aktualizuje. Výchozí hodnota: -s "*/1 * * *" -h 0 (V tomto příkladu se spark cron spouští každou minutu, zatímco HBase cron se nespustí)
- Vzhledem k tomu, že HBase cron není ve výchozím nastavení nastavený, je nutné tento skript spustit znovu při provádění škálování clusteru HBase. Pokud se cluster HBase často škáluje, můžete se rozhodnout nastavit úlohu HBase cron automaticky. Například:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
Nakonfiguruje skript tak, aby prováděl kontroly každých 30 minut. Tím se bude pravidelně spouštět plán HBase cron, který automatizuje stahování nových informací HBase o společném účtu úložiště do místního uzlu.
Poznámka:
Tyto skripty fungují pouze v clusterech HDI 5.0 a HDI 5.1.
Ruční nastavení komunikace (volitelné, pokud je zadaný skript v předchozím kroku neúspěšný)
POZNÁMKA: Tyto kroky je potřeba provést pokaždé, když jeden z clusterů prochází aktivitou škálování.
Zkopírujte hbase-site.xml z místního úložiště do kořenového adresáře výchozího úložiště clusteru Spark. Upravte příkaz tak, aby odrážel vaši konfiguraci. Pak z otevřené relace SSH do clusteru HBase zadejte příkaz:
Hodnota syntaxe Nová hodnota Schéma identifikátoru URI Upravte úložiště tak, aby odráželo vaše úložiště. Syntaxe je určená pro úložiště objektů blob s povoleným zabezpečeným přenosem. SPARK_STORAGE_CONTAINER
Nahraďte výchozí název kontejneru úložiště používaným pro cluster Spark. SPARK_STORAGE_ACCOUNT
Nahraďte výchozím názvem účtu úložiště používaným pro cluster Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Pak ukončete připojení ssh ke clusteru HBase.
exit
Připojení k hlavnímu uzlu clusteru Spark pomocí SSH. Upravte příkaz nahrazením
SPARKCLUSTER
názvu clusteru Spark a pak zadejte tento příkaz:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Zadejte příkaz pro zkopírování
hbase-site.xml
z výchozího úložiště clusteru Spark do konfigurační složky Sparku 2 v místním úložišti clusteru:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Spuštění Spark Shellu odkazující na spark HBase Připojení or
Po dokončení předchozího kroku byste měli být schopni spustit prostředí Spark a odkazovat na příslušnou verzi Spark HBase Připojení or.
V následující tabulce jsou uvedeny dvě verze a odpovídající příkazy, které tým HDInsight aktuálně používá. Stejné verze můžete použít pro clustery, pokud jsou verze HBase a Spark stejné jako v tabulce.
V otevřené relaci SSH ke clusteru Spark zadejte následující příkaz, který spustí prostředí Spark:
Verze Sparku Verze HDI HBase Verze SHC Příkaz 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Ponechte tuto instanci prostředí Spark otevřenou a pokračujte v definování katalogu a dotazu. Pokud nenajdete soubory JAR, které odpovídají vašim verzím v úložišti SHC Core, pokračujte ve čtení.
U následných kombinací verzí Sparku a HBase se tyto artefakty už nepublikují na výše uvedeném úložišti. Soubory JAR můžete sestavit přímo z větve GitHubu pro spark-hbase-connector . Pokud například používáte Spark 2.4 a HBase 2.1, proveďte tyto kroky:
Naklonujte úložiště:
git clone https://github.com/hortonworks-spark/shc
Přejděte na větev-2.4:
git checkout branch-2.4
Sestavte z větve (vytvoří soubor .jar):
mvn clean package -DskipTests
Spusťte následující příkaz (nezapomeňte změnit název .jar, který odpovídá vytvořenému souboru .jar):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Ponechte tuto instanci prostředí Spark otevřenou a pokračujte k další části.
Definování katalogu a dotazu
V tomto kroku definujete objekt katalogu, který mapuje schéma z Apache Sparku na Apache HBase.
V otevřeném prostředí Spark Shell zadejte následující
import
příkazy:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Zadáním následujícího příkazu definujte katalog pro tabulku Kontaktů, kterou jste vytvořili v HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Kód:
- Definuje schéma katalogu pro tabulku HBase s názvem
Contacts
. - Identifikuje klíč řádku jako
key
a mapuje názvy sloupců použité ve Sparku na rodinu sloupců, název sloupce a typ sloupce, jak se používá v HBase. - Definuje detailně klíč řádku jako pojmenovaný sloupec (
rowkey
), který má určitou řaducf
rowkey
sloupců .
- Definuje schéma katalogu pro tabulku HBase s názvem
Zadáním příkazu definujte metodu, která poskytuje datový rámec kolem
Contacts
tabulky v HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Vytvořte instanci datového rámce:
val df = withCatalog(catalog)
Dotaz na datový rámec:
df.show()
Měli byste vidět dva řádky dat:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Zaregistrujte dočasnou tabulku, abyste mohli dotazovat tabulku HBase pomocí Spark SQL:
df.createTempView("contacts")
Zadejte dotaz SQL na
contacts
tabulku:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Výsledky by se měly zobrazit takto:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Vložení nových dat
Pokud chcete vložit nový záznam kontaktu, definujte
ContactRecord
třídu:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Vytvořte instanci
ContactRecord
a vložte ji do pole:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Uložte pole nových dat do HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Prozkoumejte výsledky:
df.show()
Měl by se zobrazit výstup podobný tomuto:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Zavřete prostředí Spark zadáním následujícího příkazu:
:q