Delen via


Apache Spark gebruiken om Apache HBase-gegevens te lezen en schrijven

Apache HBase wordt doorgaans opgevraagd met de API op laag niveau (scans, haalt en plaatst) of met een SQL-syntaxis met behulp van Apache Phoenix. Apache biedt ook de Apache Spark HBase-Verbinding maken or. De Verbinding maken or is een handig en efficiënt alternatief voor het opvragen en wijzigen van gegevens die zijn opgeslagen door HBase.

Vereisten

  • Twee afzonderlijke HDInsight-clusters die in hetzelfde virtuele netwerk zijn geïmplementeerd. Eén HBase en één Spark waarop ten minste Spark 2.1 (HDInsight 3.6) is geïnstalleerd. Zie Clusters op basis van Linux maken in HDInsight met behulp van Azure Portal voor meer informatie.

  • Het URI-schema voor de primaire opslag voor uw clusters. Dit schema wordt wasb:// voor Azure Blob Storage, abfs:// voor Azure Data Lake Storage Gen2 of adl:// voor Azure Data Lake Storage Gen1. Als beveiligde overdracht is ingeschakeld voor Blob Storage, zou de URI zijn wasbs://. Zie ook beveiligde overdracht.

Algemeen proces

Het proces op hoog niveau voor het inschakelen van uw Spark-cluster om een query uit te voeren op uw HBase-cluster is als volgt:

  1. Bereid enkele voorbeeldgegevens voor in HBase.
  2. Haal het hbase-site.xml bestand op uit de configuratiemap van uw HBase-cluster (/etc/hbase/conf) en plaats een kopie van hbase-site.xml in uw Spark 2-configuratiemap (/etc/spark2/conf). (OPTIONEEL: gebruik het script van het HDInsight-team om dit proces te automatiseren)
  3. Voer deze opdracht spark-shell uit om te verwijzen naar de Spark HBase-Verbinding maken or door de Maven-coördinaten in de packages optie.
  4. Definieer een catalogus waarmee het schema van Spark naar HBase wordt toegewezen.
  5. Interactie met de HBase-gegevens met behulp van de RDD- of DataFrame-API's.

Voorbeeldgegevens voorbereiden in Apache HBase

In deze stap maakt en vult u een tabel in Apache HBase die u vervolgens kunt opvragen met behulp van Spark.

  1. Gebruik de ssh opdracht om verbinding te maken met uw HBase-cluster. Bewerk de opdracht door de naam van uw HBase-cluster te vervangen HBASECLUSTER en voer vervolgens de opdracht in:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Gebruik de hbase shell opdracht om de interactieve HBase-shell te starten. Voer de volgende opdracht in uw SSH-verbinding in:

    hbase shell
    
  3. Gebruik de create opdracht om een HBase-tabel met twee kolomfamilies te maken. Voer de volgende opdracht in:

    create 'Contacts', 'Personal', 'Office'
    
  4. Gebruik de put opdracht om waarden in te voegen bij een opgegeven kolom in een opgegeven rij in een bepaalde tabel. Voer de volgende opdracht in:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Gebruik de exit opdracht om de interactieve HBase-shell te stoppen. Voer de volgende opdracht in:

    exit
    

Scripts uitvoeren om verbinding tussen clusters in te stellen

Als u de communicatie tussen clusters wilt instellen, volgt u de stappen voor het uitvoeren van twee scripts op uw clusters. Deze scripts automatiseren het proces van het kopiëren van bestanden dat wordt beschreven in de sectie Communicatie handmatig instellen.

  • Het script dat u uitvoert vanuit het HBase-cluster, uploadt hbase-site.xml en HBase IP-toewijzingsgegevens naar de standaardopslag die is gekoppeld aan uw Spark-cluster.
  • Met het script dat u uitvoert vanuit het Spark-cluster, worden twee cron-taken ingesteld om periodiek twee helperscripts uit te voeren:
    1. HBase Cron-taak: download nieuwe hbase-site.xml bestanden en HBase IP-toewijzing van het standaardopslagaccount van Spark naar het lokale knooppunt
    2. Spark-cron-taak: controleert of er een Spark-schaalaanpassing is opgetreden en of het cluster veilig is. Zo ja, bewerken /etc/hosts om HBase IP-toewijzing op te nemen die lokaal is opgeslagen

OPMERKING: Voordat u doorgaat, moet u ervoor zorgen dat u het opslagaccount van het Spark-cluster als secundair opslagaccount hebt toegevoegd aan uw HBase-cluster. Zorg ervoor dat u de scripts op volgorde plaatst zoals aangegeven.

  1. Gebruik scriptactie op uw HBase-cluster om de wijzigingen toe te passen met de volgende overwegingen:

    Eigenschappen Weergegeven als
    Bash-script-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Knooppunttype(n) Regio
    Parameters -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Persistent ja
    • SECONDARYS_STORAGE_URL is de URL van de standaardopslag aan de Spark-zijde. Parametervoorbeeld: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Gebruik scriptactie op uw Spark-cluster om de wijzigingen toe te passen met de volgende overwegingen:

    Eigenschappen Weergegeven als
    Bash-script-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Knooppunttype(n) Hoofd, Werker, Zookeeper
    Parameters -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Persistent ja
    • U kunt opgeven hoe vaak u wilt dat dit cluster automatisch controleert of deze wordt bijgewerkt. Standaard: -s "*/1 * * * *" -h 0 (in dit voorbeeld wordt de Spark-cron elke minuut uitgevoerd, terwijl de HBase-cron niet wordt uitgevoerd)
    • Omdat HBase Cron niet standaard is ingesteld, moet u dit script opnieuw uitvoeren wanneer u het schalen naar uw HBase-cluster uitvoert. Als uw HBase-cluster vaak wordt geschaald, kunt u ervoor kiezen om de HBase Cron-taak automatisch in te stellen. Configureert bijvoorbeeld -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" het script om elke 30 minuten controles uit te voeren. Hiermee wordt het HBase Cron-schema periodiek uitgevoerd om het downloaden van nieuwe HBase-gegevens in het algemene opslagaccount naar het lokale knooppunt te automatiseren.

Notitie

Deze scripts werken alleen op HDI 5.0- en HDI 5.1-clusters.

Communicatie handmatig instellen (optioneel, als het opgegeven script in de bovenstaande stap mislukt)

OPMERKING: Deze stappen moeten worden uitgevoerd telkens wanneer een van de clusters een schaalactiviteit ondergaat.

  1. Kopieer de hbase-site.xml van lokale opslag naar de hoofdmap van de standaardopslag van uw Spark-cluster. Bewerk de opdracht om uw configuratie weer te geven. Voer vervolgens vanuit uw geopende SSH-sessie naar het HBase-cluster de opdracht in:

    Syntaxiswaarde Nieuwe waarde
    URI-schema Pas deze aan om uw opslag weer te geven. De syntaxis is voor blobopslag waarvoor beveiligde overdracht is ingeschakeld.
    SPARK_STORAGE_CONTAINER Vervang door de standaardnaam van de opslagcontainer die wordt gebruikt voor het Spark-cluster.
    SPARK_STORAGE_ACCOUNT Vervang door de standaardnaam van het opslagaccount die wordt gebruikt voor het Spark-cluster.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Sluit vervolgens uw SSH-verbinding met uw HBase-cluster af.

    exit
    
  3. Verbinding maken naar het hoofdknooppunt van uw Spark-cluster met behulp van SSH. Bewerk de opdracht door de naam van uw Spark-cluster te vervangen SPARKCLUSTER en voer vervolgens de opdracht in:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Voer de opdracht in om vanuit de standaardopslag van uw Spark-cluster naar de spark 2-configuratiemap in de lokale opslag van het cluster te kopiëren hbase-site.xml :

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Spark Shell uitvoeren die verwijst naar de Spark HBase-Verbinding maken or

Nadat u de vorige stap hebt voltooid, moet u Spark-shell kunnen uitvoeren, waarbij u verwijst naar de juiste versie van Spark HBase Verbinding maken or.

In de volgende tabel ziet u bijvoorbeeld twee versies en de bijbehorende opdrachten die het HDInsight-team momenteel gebruikt. U kunt dezelfde versies voor uw clusters gebruiken als de versies van HBase en Spark hetzelfde zijn als aangegeven in de tabel.

  1. Voer in uw geopende SSH-sessie naar het Spark-cluster de volgende opdracht in om een Spark-shell te starten:

    Spark-versie HDI HBase-versie SHC-versie Opdracht
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Houd dit Spark Shell-exemplaar open en ga door met het definiëren van een catalogus en query. Als u de JAR's die overeenkomen met uw versies niet vindt in de SHC Core-opslagplaats, gaat u verder met lezen.

Voor volgende combinaties van Spark- en HBase-versies worden deze artefacten niet meer gepubliceerd op de bovenstaande opslagplaats. U kunt de JAR's rechtstreeks bouwen vanuit de GitHub-vertakking spark-hbase-connector . Als u bijvoorbeeld werkt met Spark 2.4 en HBase 2.1, voert u de volgende stappen uit:

  1. Kloon de opslagplaats:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Ga naar branch-2.4:

    git checkout branch-2.4
    
  3. Bouwen vanuit de vertakking (maakt een .jar-bestand):

    mvn clean package -DskipTests
    
  4. Voer de volgende opdracht uit (wijzig de .jar naam die overeenkomt met het .jar bestand dat u hebt gemaakt):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Houd dit Spark Shell-exemplaar open en ga door naar de volgende sectie.

Een catalogus en query definiëren

In deze stap definieert u een catalogusobject waarmee het schema van Apache Spark wordt toegewezen aan Apache HBase.

  1. Voer in uw geopende Spark Shell de volgende import instructies in:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Voer de onderstaande opdracht in om een catalogus te definiëren voor de tabel Contactpersonen die u hebt gemaakt in HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    De code:

    1. Hiermee definieert u een catalogusschema voor de HBase-tabel met de naam Contacts.
    2. Identificeert de rijsleutel als keyen wijs de kolomnamen die in Spark worden gebruikt toe aan de kolomfamilie, kolomnaam en kolomtype zoals gebruikt in HBase.
    3. Definieert de rijsleutel in detail als een benoemde kolom (rowkey), die een specifieke kolomfamilie cf van rowkey.
  3. Voer de opdracht in om een methode te definiëren die een DataFrame rond uw Contacts tabel in HBase biedt:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Maak een exemplaar van het DataFrame:

    val df = withCatalog(catalog)
    
  5. Voer een query uit op het DataFrame:

    df.show()
    

    U ziet nu twee rijen met gegevens:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registreer een tijdelijke tabel zodat u een query kunt uitvoeren op de HBase-tabel met behulp van Spark SQL:

    df.createTempView("contacts")
    
  7. Voer een SQL-query uit voor de contacts tabel:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Als het goed is, ziet u de volgende resultaten:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Nieuwe gegevens invoegen

  1. Als u een nieuwe record voor contactpersonen wilt invoegen, definieert u een ContactRecord klasse:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Maak een exemplaar van ContactRecord en plaats deze in een matrix:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Sla de matrix met nieuwe gegevens op in HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Bekijk de resultaten:

    df.show()
    

    De uitvoer ziet er als volgt uit:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Sluit de Spark-shell door de volgende opdracht in te voeren:

    :q
    

Volgende stappen