Använda Apache Spark för att läsa och skriva Apache HBase-data

Apache HBase efterfrågas vanligtvis antingen med sitt lågnivå-API (genomsökningar, hämtar och placerar) eller med en SQL-syntax med hjälp av Apache Phoenix. Apache tillhandahåller även Apache Spark HBase-Anslut eller. Anslut eller är ett bekvämt och effektivt alternativ för att fråga efter och ändra data som lagras av HBase.

Förutsättningar

Övergripande process

Den övergripande processen för att aktivera Spark-klustret för att fråga ditt HBase-kluster är följande:

  1. Förbered några exempeldata i HBase.
  2. Hämta hbase-site.xml-filen från konfigurationsmappen för HBase-klustret (/etc/hbase/conf) och placera en kopia av hbase-site.xml i konfigurationsmappen för Spark 2 (/etc/spark2/conf). (VALFRITT: använd skript som tillhandahålls av HDInsight-teamet för att automatisera den här processen)
  3. Kör spark-shell och referera till Spark HBase-Anslut eller med dess Maven-koordinater i alternativet packages .
  4. Definiera en katalog som mappar schemat från Spark till HBase.
  5. Interagera med HBase-data med rdd- eller DataFrame-API:er.

Förbereda exempeldata i Apache HBase

I det här steget skapar och fyller du i en tabell i Apache HBase som du sedan kan köra frågor mot med Spark.

  1. ssh Använd kommandot för att ansluta till ditt HBase-kluster. Redigera kommandot genom att HBASECLUSTER ersätta med namnet på ditt HBase-kluster och ange sedan kommandot:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. hbase shell Använd kommandot för att starta det interaktiva HBase-gränssnittet. Ange följande kommando i din SSH-anslutning:

    hbase shell
    
  3. create Använd kommandot för att skapa en HBase-tabell med tvåkolumnsfamiljer. Ange följande kommando:

    create 'Contacts', 'Personal', 'Office'
    
  4. put Använd kommandot för att infoga värden i en angiven kolumn på en angiven rad i en viss tabell. Ange följande kommando:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. exit Använd kommandot för att stoppa det interaktiva HBase-gränssnittet. Ange följande kommando:

    exit
    

Kör skript för att konfigurera anslutning mellan kluster

Om du vill konfigurera kommunikationen mellan kluster följer du stegen för att köra två skript i dina kluster. Dessa skript automatiserar processen för filkopiering som beskrivs i avsnittet Konfigurera kommunikation manuellt.

  • Skriptet som du kör från HBase-klustret laddar upp hbase-site.xml och HBase IP-mappningsinformation till standardlagringen som är kopplad till ditt Spark-kluster.
  • Skriptet som du kör från Spark-klustret konfigurerar två cron-jobb för att köra två hjälpskript med jämna mellanrum:
    1. HBase cron-jobb – ladda ned nya hbase-site.xml filer och HBase IP-mappning från Spark-standardlagringskontot till den lokala noden
    2. Spark cron-jobb – kontrollerar om en Spark-skalning har inträffat och om klustret är säkert. I så fall redigerar /etc/hosts du för att inkludera HBase IP-mappning som lagras lokalt

Obs! Kontrollera att du har lagt till Spark-klustrets lagringskonto i HBase-klustret som ett sekundärt lagringskonto innan du fortsätter. Kontrollera att skripten är i ordning enligt angiven ordning.

  1. Använd Skriptåtgärd i ditt HBase-kluster för att tillämpa ändringarna med följande överväganden:

    Property Värde
    Bash-skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Nodtyper Region
    Parametrar -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Framhärdade ja
    • SECONDARYS_STORAGE_URL är url:en för Standardlagring på Spark-sidan. Parameterexempel: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Använd Skriptåtgärd i Spark-klustret för att tillämpa ändringarna med följande överväganden:

    Property Värde
    Bash-skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Nodtyper Chef, Arbetare, Zookeeper
    Parametrar -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Framhärdade ja
    • Du kan ange hur ofta du vill att det här klustret ska kontrollera om det uppdateras automatiskt. Standard: -s "*/1 * * * *" -h 0 (I det här exemplet körs Spark cron varje minut, medan HBase-cronen inte körs)
    • Eftersom HBase cron inte har konfigurerats som standard måste du köra skriptet igen när du utför skalning till ditt HBase-kluster. Om ditt HBase-kluster skalas ofta kan du välja att konfigurera HBase cron-jobb automatiskt. Till exempel: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" konfigurerar skriptet för att utföra kontroller var 30:e minut. Detta kör HBase cron-schema regelbundet för att automatisera nedladdningen av ny HBase-information på det gemensamma lagringskontot till den lokala noden.

Kommentar

Dessa skript fungerar endast på HDI 5.0- och HDI 5.1-kluster.

Konfigurera kommunikation manuellt (valfritt, om det angivna skriptet i ovanstående steg misslyckas)

Obs! Dessa steg måste utföras varje gång ett av klustren genomgår en skalningsaktivitet.

  1. Kopiera hbase-site.xml från lokal lagring till roten för Spark-klustrets standardlagring. Redigera kommandot för att återspegla konfigurationen. Ange sedan kommandot från den öppna SSH-sessionen till HBase-klustret:

    Syntaxvärde Nytt värde
    URI-schema Ändra för att återspegla lagringen. Syntaxen är för bloblagring med säker överföring aktiverad.
    SPARK_STORAGE_CONTAINER Ersätt med standardnamnet för lagringscontainern som används för Spark-klustret.
    SPARK_STORAGE_ACCOUNT Ersätt med standardnamnet för lagringskontot som används för Spark-klustret.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Avsluta sedan ssh-anslutningen till HBase-klustret.

    exit
    
  3. Anslut till huvudnoden i spark-klustret med hjälp av SSH. Redigera kommandot genom att SPARKCLUSTER ersätta med namnet på ditt Spark-kluster och ange sedan kommandot:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Ange kommandot för att kopiera hbase-site.xml från Spark-klustrets standardlagring till Spark 2-konfigurationsmappen på klustrets lokala lagring:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Kör Spark Shell som refererar till Spark HBase-Anslut eller

När du har slutfört föregående steg bör du kunna köra Spark-gränssnittet och referera till lämplig version av Spark HBase-Anslut eller.

I följande tabell visas till exempel två versioner och motsvarande kommandon som HDInsight-teamet använder för närvarande. Du kan använda samma versioner för dina kluster om versionerna av HBase och Spark är samma som i tabellen.

  1. I den öppna SSH-sessionen till Spark-klustret anger du följande kommando för att starta ett Spark-gränssnitt:

    Spark-version HDI HBase-version SHC-version Command
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Håll den här Spark Shell-instansen öppen och fortsätt att definiera en katalog och fråga. Om du inte hittar jar-filerna som motsvarar dina versioner på SHC Core-lagringsplatsen fortsätter du att läsa.

För efterföljande kombinationer av Spark- och HBase-versioner publiceras inte längre dessa artefakter på lagringsplatsen ovan. Du kan skapa jar-filerna direkt från GitHub-grenen spark-hbase-connector . Om du till exempel kör med Spark 2.4 och HBase 2.1 utför du följande steg:

  1. Klona lagringsplatsen:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Gå till branch-2.4:

    git checkout branch-2.4
    
  3. Skapa från grenen (skapar en .jar-fil):

    mvn clean package -DskipTests
    
  4. Kör följande kommando (se till att ändra det .jar namn som motsvarar den .jar fil som du skapade):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Håll den här Spark Shell-instansen öppen och fortsätt till nästa avsnitt.

Definiera en katalog och fråga

I det här steget definierar du ett katalogobjekt som mappar schemat från Apache Spark till Apache HBase.

  1. I ditt öppna Spark Shell anger du följande import instruktioner:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Ange kommandot nedan för att definiera en katalog för tabellen Kontakter som du skapade i HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Koden:

    1. Definierar ett katalogschema för HBase-tabellen med namnet Contacts.
    2. Identifierar radnyckeln som keyoch mappar kolumnnamnen som används i Spark till kolumnfamiljen, kolumnnamnet och kolumntypen som används i HBase.
    3. Definierar radnyckeln i detalj som en namngiven kolumn (rowkey), som har en specifik kolumnfamilj cf med rowkey.
  3. Ange kommandot för att definiera en metod som tillhandahåller en DataFrame runt tabellen Contacts i HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Skapa en instans av DataFrame:

    val df = withCatalog(catalog)
    
  5. Fråga DataFrame:

    df.show()
    

    Du bör se två rader med data:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registrera en tillfällig tabell så att du kan köra frågor mot HBase-tabellen med Spark SQL:

    df.createTempView("contacts")
    
  7. Utfärda en SQL-fråga mot contacts tabellen:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Du bör se resultat som dessa:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Infoga nya data

  1. Om du vill infoga en ny kontaktpost definierar du en ContactRecord klass:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Skapa en instans av ContactRecord och placera den i en matris:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Spara matrisen med nya data till HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Granska resultaten:

    df.show()
    

    Du bör se utdata som ser ut så här:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Stäng Spark-gränssnittet genom att ange följande kommando:

    :q
    

Nästa steg