Odczytywanie i zapisywanie danych w bazie danych Apache HBase za pomocą platformy Apache Spark

Baza danych Apache HBase jest zwykle odpytywane przy użyciu interfejsu API niskiego poziomu (skanowanie, pobieranie i stawianie) lub składnię SQL przy użyciu platformy Apache Phoenix. Platforma Apache udostępnia również Połączenie or platformy Apache Spark HBase. Połączenie or to wygodna i wydajna alternatywa do wykonywania zapytań i modyfikowania danych przechowywanych przez bazę danych HBase.

Wymagania wstępne

  • Dwa oddzielne klastry usługi HDInsight wdrożone w tej samej sieci wirtualnej. Jedna baza danych HBase i jedna platforma Spark z zainstalowaną platformą Spark 2.1 (HDInsight 3.6). Aby uzyskać więcej informacji, zobacz Tworzenie klastrów opartych na systemie Linux w usłudze HDInsight przy użyciu witryny Azure Portal.

  • Schemat identyfikatora URI dla magazynu podstawowego klastrów. Ten schemat będzie wasb:// dla usługi Azure Blob Storage, abfs:// dla usługi Azure Data Lake Storage Gen2 lub adl:// dla usługi Azure Data Lake Storage Gen1. Jeśli bezpieczny transfer jest włączony dla usługi Blob Storage, identyfikator URI to wasbs://. Zobacz również bezpieczny transfer.

Ogólny proces

Ogólny proces włączania klastra Spark do wykonywania zapytań względem klastra HBase jest następujący:

  1. Przygotuj przykładowe dane w bazie HBase.
  2. Uzyskaj plik hbase-site.xml z folderu konfiguracji klastra HBase (/etc/hbase/conf) i umieść kopię hbase-site.xml w folderze konfiguracji platformy Spark 2 (/etc/spark2/conf). (OPCJONALNIE: użyj skryptu dostarczonego przez zespół usługi HDInsight, aby zautomatyzować ten proces)
  3. Uruchom spark-shell odwoływanie się do Połączenie usługi Spark HBase według współrzędnych narzędzia Maven w packages opcji.
  4. Zdefiniuj katalog mapujący schemat z platformy Spark na bazę danych HBase.
  5. Interakcja z danymi bazy danych HBase przy użyciu interfejsów API RDD lub DataFrame.

Przygotowywanie przykładowych danych w bazie danych Apache HBase

W tym kroku utworzysz i wypełnisz tabelę w bazie danych Apache HBase, którą następnie możesz wykonać za pomocą platformy Spark.

  1. Użyj polecenia , ssh aby nawiązać połączenie z klastrem HBase. Edytuj polecenie, zastępując HBASECLUSTER ciąg nazwą klastra HBase, a następnie wprowadź polecenie:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Użyj polecenia , hbase shell aby uruchomić interaktywną powłokę HBase. Wprowadź następujące polecenie w połączeniu SSH:

    hbase shell
    
  3. Użyj polecenia , create aby utworzyć tabelę HBase z dwiema rodzinami kolumn. Podaj następujące polecenie:

    create 'Contacts', 'Personal', 'Office'
    
  4. put Użyj polecenia , aby wstawić wartości w określonej kolumnie w określonym wierszu w określonej tabeli. Podaj następujące polecenie:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Użyj polecenia , exit aby zatrzymać interaktywną powłokę HBase. Podaj następujące polecenie:

    exit
    

Uruchamianie skryptów w celu skonfigurowania połączenia między klastrami

Aby skonfigurować komunikację między klastrami, wykonaj kroki uruchamiania dwóch skryptów w klastrach. Te skrypty automatyzują proces kopiowania plików opisany w sekcji "Konfigurowanie komunikacji ręcznie".

  • Skrypt uruchamiany z klastra HBase przekaże hbase-site.xml informacje o mapowaniu adresów IP bazy danych HBase do domyślnego magazynu dołączonego do klastra Spark.
  • Skrypt uruchamiany z klastra Spark konfiguruje dwa zadania cron, aby okresowo uruchamiać dwa skrypty pomocnicze:
    1. Zadanie cron bazy danych HBase — pobieranie nowych hbase-site.xml plików i mapowanie adresów IP bazy danych HBase z domyślnego konta magazynu platformy Spark na węzeł lokalny
    2. Zadanie cron platformy Spark — sprawdza, czy wystąpiło skalowanie platformy Spark i czy klaster jest bezpieczny. Jeśli tak, edytuj /etc/hosts , aby uwzględnić mapowanie adresów IP bazy danych HBase przechowywane lokalnie

UWAGA: Przed kontynuowaniem upewnij się, że konto magazynu klastra Spark zostało dodane do klastra HBase jako pomocnicze konto magazynu. Upewnij się, że skrypty są uporządkowane zgodnie ze wskazaniem.

  1. Użyj akcji skryptu w klastrze HBase, aby zastosować zmiany z następującymi zagadnieniami:

    Właściwości Wartość
    Identyfikator URI skryptu powłoki Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Typy węzłów Region (Region)
    Parametry -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Trwały tak
    • SECONDARYS_STORAGE_URL to adres URL domyślnego magazynu po stronie platformy Spark. Przykład parametru: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Użyj akcji skryptu w klastrze Spark, aby zastosować zmiany z następującymi zagadnieniami:

    Właściwości Wartość
    Identyfikator URI skryptu powłoki Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Typy węzłów Szef, Pracownik, Zookeeper
    Parametry -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Trwały tak
    • Możesz określić, jak często ten klaster ma automatycznie sprawdzać, czy aktualizacja jest aktualizowana. Wartość domyślna: -s "*/1 * *" -h 0 (w tym przykładzie cron platformy Spark jest uruchamiany co minutę, podczas gdy cron bazy danych HBase nie jest uruchomiony)
    • Ponieważ cron bazy danych HBase nie jest domyślnie skonfigurowany, należy ponownie uruchomić ten skrypt podczas skalowania do klastra HBase. Jeśli klaster HBase jest często skalowany, możesz wybrać automatyczne skonfigurowanie zadania Cron bazy danych HBase. Na przykład: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" konfiguruje skrypt do przeprowadzania kontroli co 30 minut. Spowoduje to okresowe uruchomienie harmonogramu Cron bazy danych HBase w celu zautomatyzowania pobierania nowych informacji bazy danych HBase na wspólnym koncie magazynu do węzła lokalnego.

Uwaga

Te skrypty działają tylko w klastrach HDI 5.0 i HDI 5.1.

Ręczne konfigurowanie komunikacji (opcjonalnie, jeśli podany skrypt w powyższym kroku zakończy się niepowodzeniem)

UWAGA: Te kroki należy wykonać za każdym razem, gdy jeden z klastrów przechodzi działanie skalowania.

  1. Skopiuj hbase-site.xml z magazynu lokalnego do katalogu głównego domyślnego magazynu klastra Spark. Edytuj polecenie, aby odzwierciedlić konfigurację. Następnie w otwartej sesji SSH do klastra HBase wprowadź polecenie:

    Wartość składniowa Nowa wartość
    Schemat identyfikatora URI Zmodyfikuj, aby odzwierciedlić magazyn. Składnia dotyczy magazynu obiektów blob z włączonym bezpiecznym transferem.
    SPARK_STORAGE_CONTAINER Zastąp ciąg domyślną nazwą kontenera magazynu używaną dla klastra Spark.
    SPARK_STORAGE_ACCOUNT Zastąp ciąg domyślną nazwą konta magazynu używaną dla klastra Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Następnie zamknij połączenie SSH z klastrem HBase.

    exit
    
  3. Połączenie do węzła głównego klastra Spark przy użyciu protokołu SSH. Zmodyfikuj polecenie, zastępując SPARKCLUSTER ciąg nazwą klastra Spark, a następnie wprowadź polecenie:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Wprowadź polecenie, aby skopiować hbase-site.xml z domyślnego magazynu klastra Spark do folderu konfiguracji platformy Spark 2 w magazynie lokalnym klastra:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Uruchamianie powłoki Spark odwołującej się do Połączenie or spark HBase

Po wykonaniu poprzedniego kroku powinno być możliwe uruchomienie powłoki Spark, odwołując się do odpowiedniej wersji Połączenie or spark HBase.

Na przykład w poniższej tabeli wymieniono dwie wersje i odpowiednie polecenia, których obecnie używa zespół usługi HDInsight. Możesz użyć tych samych wersji dla klastrów, jeśli wersje bazy danych HBase i platformy Spark są takie same jak w tabeli.

  1. W otwartej sesji SSH w klastrze Spark wprowadź następujące polecenie, aby uruchomić powłokę spark:

    Wersja platformy Spark Wersja HBase usługi HDI Wersja usługi SHC Polecenie
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Pozostaw otwarte to wystąpienie powłoki Spark i kontynuuj definiowanie katalogu i zapytania. Jeśli nie znajdziesz plików jar odpowiadających twoim wersjom w repozytorium SHC Core, kontynuuj czytanie.

W przypadku kolejnych kombinacji wersji platformy Spark i bazy danych HBase te artefakty nie są już publikowane w powyższym repozytorium. Pliki jar można tworzyć bezpośrednio z gałęzi GitHub spark-hbase-connector . Jeśli na przykład korzystasz z platform Spark 2.4 i HBase 2.1, wykonaj następujące kroki:

  1. Sklonuj repozytorium:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Przejdź do gałęzi-2.4:

    git checkout branch-2.4
    
  3. Skompiluj z gałęzi (tworzy plik .jar):

    mvn clean package -DskipTests
    
  4. Uruchom następujące polecenie (pamiętaj, aby zmienić nazwę .jar odpowiadającą skompilowanemu plikowi .jar):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Pozostaw to wystąpienie powłoki Spark otwarte i przejdź do następnej sekcji.

Definiowanie wykazu i zapytania

W tym kroku zdefiniujesz obiekt wykazu, który mapuje schemat z platformy Apache Spark na bazę danych Apache HBase.

  1. W otwartej powłoce Spark wprowadź następujące import instrukcje:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Wprowadź poniższe polecenie, aby zdefiniować wykaz dla tabeli Kontakty utworzonej w bazie HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Kod:

    1. Definiuje schemat wykazu dla tabeli HBase o nazwie Contacts.
    2. Identyfikuje klucz wiersza jako keyi mapuje nazwy kolumn używane na platformie Spark do rodziny kolumn, nazwy kolumn i typu kolumny, jak użyto w bazie HBase.
    3. Definiuje szczegółowo klucz wiersza jako nazwaną kolumnę (rowkey), która ma określoną rodzinę cfrowkeykolumn .
  3. Wprowadź polecenie , aby zdefiniować metodę, która udostępnia ramkę danych wokół Contacts tabeli w bazie HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Utwórz wystąpienie ramki danych:

    val df = withCatalog(catalog)
    
  5. Wykonywanie zapytań względem ramki danych:

    df.show()
    

    Powinny zostać wyświetlone dwa wiersze danych:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Zarejestruj tabelę tymczasową, aby móc wykonywać zapytania dotyczące tabeli HBase przy użyciu usługi Spark SQL:

    df.createTempView("contacts")
    
  7. Wykonaj zapytanie SQL względem contacts tabeli:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Powinny zostać wyświetlone następujące wyniki:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Wstawianie nowych danych

  1. Aby wstawić nowy rekord Kontakt, zdefiniuj klasę ContactRecord :

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Utwórz wystąpienie klasy ContactRecord i umieść je w tablicy:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Zapisz tablicę nowych danych w bazie danych HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Sprawdź wyniki:

    df.show()
    

    Powinny pojawić się następujące dane wyjściowe:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Zamknij powłokę spark, wprowadzając następujące polecenie:

    :q
    

Następne kroki