Použití Apache Sparku ke čtení a zápisu dat Apache HBase

Apache HBase se obvykle dotazuje pomocí rozhraní API nízké úrovně (prohledávání, získání a vkládání) nebo pomocí syntaxe SQL pomocí Apache Phoenixu. Apache také poskytuje Připojení or Apache Spark HBase. Připojení or je praktickou a efektivní alternativou k dotazování a úpravě dat uložených HBase.

Požadavky

Celkový proces

Proces vysoké úrovně pro povolení dotazování clusteru Spark na cluster HBase je následující:

  1. Připravte ukázková data v HBase.
  2. Získejte soubor hbase-site.xml ze složky konfigurace clusteru HBase (/etc/hbase/conf) a vložte kopii hbase-site.xml do konfigurační složky Sparku 2 (/etc/spark2/conf). (VOLITELNÉ: Použití skriptu poskytovaného týmem HDInsight k automatizaci tohoto procesu)
  3. Spusťte spark-shell odkazování na Spark HBase Připojení or podle souřadnic Mavenu packages v možnosti.
  4. Definujte katalog, který mapuje schéma ze Sparku na HBase.
  5. Interakce s daty HBase pomocí rozhraní API rdD nebo datového rámce

Příprava ukázkových dat v Apache HBase

V tomto kroku vytvoříte a naplníte tabulku v Apache HBase, kterou pak můžete dotazovat pomocí Sparku.

  1. ssh Pomocí příkazu se připojte ke clusteru HBase. Upravte příkaz nahrazením HBASECLUSTER názvu clusteru HBase a pak zadejte příkaz:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. hbase shell Pomocí příkazu spusťte interaktivní prostředí HBase. Do připojení SSH zadejte následující příkaz:

    hbase shell
    
  3. create Pomocí příkazu vytvořte tabulku HBase se dvěma sloupci. Zadejte tento příkaz:

    create 'Contacts', 'Personal', 'Office'
    
  4. put Pomocí příkazu vložte hodnoty do zadaného sloupce v zadaném řádku v konkrétní tabulce. Zadejte tento příkaz:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. exit Pomocí příkazu zastavte interaktivní prostředí HBase. Zadejte tento příkaz:

    exit
    

Spuštění skriptů pro nastavení připojení mezi clustery

Pokud chcete nastavit komunikaci mezi clustery, postupujte podle kroků pro spuštění dvou skriptů v clusterech. Tyto skripty automatizují proces kopírování souborů popsaný v části Nastavení komunikace ručně.

  • Skript, který spustíte z clusteru HBase, nahraje hbase-site.xml informace o mapování ip adres HBase do výchozího úložiště připojeného k vašemu clusteru Spark.
  • Skript, který spustíte z clusteru Spark, nastaví dvě úlohy cron, aby pravidelně spouštějí dva pomocné skripty:
    1. Úloha HBase cron – stažení nových hbase-site.xml souborů a mapování IP adres HBase z výchozího účtu úložiště Sparku na místní uzel
    2. Úloha Spark cron – kontroluje, jestli došlo ke škálování Sparku a jestli je cluster zabezpečený. Pokud ano, upravte /etc/hosts mapování IP adres HBase uložené místně.

POZNÁMKA: Než budete pokračovat, ujistěte se, že jste do clusteru HBase přidali účet úložiště clusteru Spark jako sekundární účet úložiště. Ujistěte se, že jsou skripty v uvedeném pořadí.

  1. Pomocí akce skriptu v clusteru HBase použijte změny s následujícími aspekty:

    Vlastnost Hodnota
    Identifikátor URI skriptu Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Typy uzlů Oblast
    Parametry -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Trvalé ano
    • SECONDARYS_STORAGE_URL je adresa URL výchozího úložiště na straně Sparku. Příklad parametru: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Pomocí akce skriptu v clusteru Spark použijte změny s následujícími aspekty:

    Vlastnost Hodnota
    Identifikátor URI skriptu Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Typy uzlů Vedoucí, Pracovník, Zookeeper
    Parametry -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Trvalé ano
    • Můžete určit, jak často má tento cluster automaticky kontrolovat, jestli se aktualizuje. Výchozí hodnota: -s "*/1 * * *" -h 0 (V tomto příkladu se spark cron spouští každou minutu, zatímco HBase cron se nespustí)
    • Vzhledem k tomu, že HBase cron není ve výchozím nastavení nastavený, je nutné tento skript spustit znovu při provádění škálování clusteru HBase. Pokud se cluster HBase často škáluje, můžete se rozhodnout nastavit úlohu HBase cron automaticky. Například: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" Nakonfiguruje skript tak, aby prováděl kontroly každých 30 minut. Tím se bude pravidelně spouštět plán HBase cron, který automatizuje stahování nových informací HBase o společném účtu úložiště do místního uzlu.

Poznámka:

Tyto skripty fungují pouze v clusterech HDI 5.0 a HDI 5.1.

Ruční nastavení komunikace (volitelné, pokud je zadaný skript v předchozím kroku neúspěšný)

POZNÁMKA: Tyto kroky je potřeba provést pokaždé, když jeden z clusterů prochází aktivitou škálování.

  1. Zkopírujte hbase-site.xml z místního úložiště do kořenového adresáře výchozího úložiště clusteru Spark. Upravte příkaz tak, aby odrážel vaši konfiguraci. Pak z otevřené relace SSH do clusteru HBase zadejte příkaz:

    Hodnota syntaxe Nová hodnota
    Schéma identifikátoru URI Upravte úložiště tak, aby odráželo vaše úložiště. Syntaxe je určená pro úložiště objektů blob s povoleným zabezpečeným přenosem.
    SPARK_STORAGE_CONTAINER Nahraďte výchozí název kontejneru úložiště používaným pro cluster Spark.
    SPARK_STORAGE_ACCOUNT Nahraďte výchozím názvem účtu úložiště používaným pro cluster Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Pak ukončete připojení ssh ke clusteru HBase.

    exit
    
  3. Připojení k hlavnímu uzlu clusteru Spark pomocí SSH. Upravte příkaz nahrazením SPARKCLUSTER názvu clusteru Spark a pak zadejte tento příkaz:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Zadejte příkaz pro zkopírování hbase-site.xml z výchozího úložiště clusteru Spark do konfigurační složky Sparku 2 v místním úložišti clusteru:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Spuštění Spark Shellu odkazující na spark HBase Připojení or

Po dokončení předchozího kroku byste měli být schopni spustit prostředí Spark a odkazovat na příslušnou verzi Spark HBase Připojení or.

V následující tabulce jsou uvedeny dvě verze a odpovídající příkazy, které tým HDInsight aktuálně používá. Stejné verze můžete použít pro clustery, pokud jsou verze HBase a Spark stejné jako v tabulce.

  1. V otevřené relaci SSH ke clusteru Spark zadejte následující příkaz, který spustí prostředí Spark:

    Verze Sparku Verze HDI HBase Verze SHC Příkaz
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Ponechte tuto instanci prostředí Spark otevřenou a pokračujte v definování katalogu a dotazu. Pokud nenajdete soubory JAR, které odpovídají vašim verzím v úložišti SHC Core, pokračujte ve čtení.

U následných kombinací verzí Sparku a HBase se tyto artefakty už nepublikují na výše uvedeném úložišti. Soubory JAR můžete sestavit přímo z větve GitHubu pro spark-hbase-connector . Pokud například používáte Spark 2.4 a HBase 2.1, proveďte tyto kroky:

  1. Naklonujte úložiště:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Přejděte na větev-2.4:

    git checkout branch-2.4
    
  3. Sestavte z větve (vytvoří soubor .jar):

    mvn clean package -DskipTests
    
  4. Spusťte následující příkaz (nezapomeňte změnit název .jar, který odpovídá vytvořenému souboru .jar):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Ponechte tuto instanci prostředí Spark otevřenou a pokračujte k další části.

Definování katalogu a dotazu

V tomto kroku definujete objekt katalogu, který mapuje schéma z Apache Sparku na Apache HBase.

  1. V otevřeném prostředí Spark Shell zadejte následující import příkazy:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Zadáním následujícího příkazu definujte katalog pro tabulku Kontaktů, kterou jste vytvořili v HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Kód:

    1. Definuje schéma katalogu pro tabulku HBase s názvem Contacts.
    2. Identifikuje klíč řádku jako keya mapuje názvy sloupců použité ve Sparku na rodinu sloupců, název sloupce a typ sloupce, jak se používá v HBase.
    3. Definuje detailně klíč řádku jako pojmenovaný sloupec (rowkey), který má určitou řadu cfrowkeysloupců .
  3. Zadáním příkazu definujte metodu, která poskytuje datový rámec kolem Contacts tabulky v HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Vytvořte instanci datového rámce:

    val df = withCatalog(catalog)
    
  5. Dotaz na datový rámec:

    df.show()
    

    Měli byste vidět dva řádky dat:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Zaregistrujte dočasnou tabulku, abyste mohli dotazovat tabulku HBase pomocí Spark SQL:

    df.createTempView("contacts")
    
  7. Zadejte dotaz SQL na contacts tabulku:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Výsledky by se měly zobrazit takto:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Vložení nových dat

  1. Pokud chcete vložit nový záznam kontaktu, definujte ContactRecord třídu:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Vytvořte instanci ContactRecord a vložte ji do pole:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Uložte pole nových dat do HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Prozkoumejte výsledky:

    df.show()
    

    Měl by se zobrazit výstup podobný tomuto:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Zavřete prostředí Spark zadáním následujícího příkazu:

    :q
    

Další kroky