Verwenden von Apache Spark zum Lesen und Schreiben von Apache HBase-Daten

Apache HBase wird üblicherweise über die Low-Level-API (scan-, get- und put-Abfragen) oder mit einer SQL-Syntax unter Verwendung von Apache Phoenix abgefragt. Apache bietet außerdem den Apache Spark HBase-Connector. Der Connector ist eine praktische und effiziente Alternative zum Abfragen und Ändern von Daten, die von HBase gespeichert wurden.

Voraussetzungen

Übersicht über den Prozess

Der allgemeine Prozess zum Aktivieren Ihres Spark-Clusters für die Abfrage Ihres HBase-Clusters sieht wie folgt aus:

  1. Bereiten Sie einige Beispieldaten in HBase vor.
  2. Rufen Sie die Datei „hbase-site.xml“ aus dem Konfigurationsordner Ihres HBase-Clusters (/etc/HBase/conf) ab, und speichern Sie eine Kopie der Datei „hbase-site.xml“ in Ihrem Spark 2-Konfigurationsordner (/etc/spark2/conf). (OPTIONAL: Verwenden Sie das vom HDInsight-Team bereitgestellte Skript, um diesen Vorgang zu automatisieren.)
  3. Führen Sie spark-shell aus, und verweisen Sie in der Option packages mit den Maven-Koordinaten auf den Spark HBase-Connector.
  4. Definieren Sie einen Katalog, der das Spark-Schema zu HBase zuordnet.
  5. Interagieren Sie entweder über die RDD- oder die Dataframe-APIs mit den HBase-Daten.

Vorbereiten von Beispieldaten in Apache HBase

In diesem Schritt erstellen Sie eine Tabelle in Apache HBase und füllen sie auf. Diese Tabelle können Sie dann mit Spark abfragen.

  1. Verwenden Sie zum Herstellen der Verbindung mit Ihrem HBase-Cluster ssh. Bearbeiten Sie den Befehl, indem Sie HBASECLUSTER durch den Namen Ihres HBase-Clusters ersetzen, und geben Sie den Befehl dann ein:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Verwenden Sie den Befehl hbase shell, um die interaktive HBase-Shell zu starten. Geben Sie den folgenden Befehl in Ihrer SSH-Verbindung ein:

    hbase shell
    
  3. Verwenden Sie den Befehl create, um eine HBase-Tabelle mit zwei Spaltenfamilien zu erstellen. Geben Sie den folgenden Befehl ein:

    create 'Contacts', 'Personal', 'Office'
    
  4. Verwenden Sie den Befehl put, um Werte in einer angegebenen Spalte einer angegebenen Zeile in einer bestimmten Tabelle einzufügen. Geben Sie den folgenden Befehl ein:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Verwenden Sie den Befehl exit, um die interaktive HBase-Shell zu beenden. Geben Sie den folgenden Befehl ein:

    exit
    

Ausführen von Skripts zum Einrichten der Verbindung zwischen Clustern

Führen Sie zum Einrichten der Kommunikation zwischen den Clustern die folgenden Schritte durch, um zwei Skripts in Ihren Clustern auszuführen. Mit diesen Skripts wird der Dateikopiervorgang (siehe Beschreibung im Abschnitt „Manuelles Einrichten der Kommunikation“) automatisiert.

  • Mit dem Skript, das Sie im HBase-Cluster ausführen, werden die Datei hbase-site.xml sowie HBase-IP-Zuordnungsinformationen in den Standardspeicher hochgeladen, der dem Spark-Cluster zugeordnet ist.
  • Mit dem im Spark-Cluster ausgeführten Skript werden zwei Cronjobs eingerichtet, die regelmäßig zwei Hilfsskripts ausführen:
    1. HBase-Cronjob: Herunterladen neuer Versionen der Datei hbase-site.xml und der HBase-IP-Zuordnung aus dem Spark-Standardspeicherkonto auf den lokalen Knoten.
    2. Spark-Cronjob: Überprüfen, ob eine Spark-Skalierung durchgeführt wurde und der Cluster sicher ist. Wenn dies der Fall ist, bearbeiten Sie /etc/hosts, um die lokal gespeicherte HBase-IP-Zuordnung einzuschließen.

HINWEIS: Bevor Sie den Vorgang fortsetzen, müssen Sie sicherstellen, dass Sie das Speicherkonto des Spark-Clusters als sekundäres Speicherkonto zum HBase-Cluster hinzugefügt haben. Achten Sie darauf, dass Sie die Skripts in der angegebenen Reihenfolge ausführen.

  1. Verwenden Sie in Ihrem HBase-Cluster eine Skriptaktion, um die Änderungen unter folgenden Aspekten anzuwenden:

    Eigenschaft Wert
    Bash-Skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Knotentyp(en) Region
    Parameter -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Persistent ja
    • SECONDARYS_STORAGE_URL ist der URL des Spark-Standardspeichers. Parameterbeispiel: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Verwenden Sie in Ihrem Spark-Cluster eine Skriptaktion, um die Änderungen unter folgenden Aspekten anzuwenden:

    Eigenschaft Wert
    Bash-Skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Knotentyp(en) Head, Worker, Zookeeper
    Parameter -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Persistent ja
    • Sie können angeben, wie oft dieser Cluster automatisch auf Updates überprüfen soll. Standard: -s “*/1 * * * *” -h 0 (In diesem Beispiel wird der Spark-Cronjob jede Minute und der HBase-Cronjob überhaupt nicht ausgeführt)
    • Da der HBase-Cron standardmäßig nicht eingerichtet ist, müssen Sie dieses Skript erneut ausführen, wenn Sie eine Skalierung Ihres HBase-Clusters vornehmen. Wenn Sie Ihren HBase-Cluster häufig skalieren, werden Sie sich wahrscheinlich für die automatische Einrichtung des HBase-Cronjobs entscheiden. Beispiel: Mit -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" wird das Skript so konfiguriert, dass alle 30 Minuten Überprüfungen durchgeführt werden. Dadurch wird der HBase-Cronjob regelmäßig nach Zeitplan ausgeführt, und das Herunterladen neuer HBase-Informationen aus dem allgemeinen Speicherkonto auf den lokalen Knoten wird automatisiert.

Hinweis

Diese Skripts funktionieren nur auf HDI 5.0- und HDI 5.1-Clustern.

Manuelles Einrichten der Kommunikation (optionale Vorgehensweise bei Fehlfunktion des oben genannten Skripts)

HINWEIS: Diese Schritte müssen jedes Mal ausgeführt werden, wenn für einen der Cluster ein Skalierungsvorgang durchgeführt wird.

  1. Kopieren Sie die Datei „hbase-site.xml“ aus dem lokalen Speicher in das Stammverzeichnis des Standardspeichers Ihres Spark-Clusters. Bearbeiten Sie den Befehl, um ihn an Ihre Konfiguration anzupassen. Geben Sie anschließend in Ihrer geöffneten SSH-Sitzung diesen Befehl für den HBase-Cluster ein:

    Syntaxwert Neuer Wert
    URI-Schema Nehmen Sie Änderungen zur Anpassung an Ihren Speicher vor. Die angegebene Syntax gilt für Blobspeicher mit aktivierter sicherer Übertragung.
    SPARK_STORAGE_CONTAINER Fügen Sie den Namen Ihres Standardspeichercontainers ein, der für den Spark-Cluster verwendet wird.
    SPARK_STORAGE_ACCOUNT Fügen Sie den Namen Ihres Standardspeicherkontos ein, der für den Spark-Cluster verwendet wird.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Trennen Sie dann die SSH-Verbindung mit Ihrem HBase-Cluster.

    exit
    
  3. Stellen Sie über SSH eine Verbindung mit dem Hauptknoten Ihres Spark-Clusters her. Bearbeiten Sie den Befehl, indem Sie SPARKCLUSTER durch den Namen Ihres Spark-Clusters ersetzen, und führen Sie dann den Befehl aus:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Geben Sie den folgenden Befehl ein, um hbase-site.xml aus dem Standardspeicher Ihres Spark-Clusters in den Spark 2-Konfigurationsordner im lokalen Speicher des Clusters zu kopieren:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Ausführen der Spark-Shell und Verweisen auf den Spark HBase-Connector

Nachdem Sie den vorherigen Schritt abgeschlossen haben, sollten Sie die Spark-Shell ausführen können, die auf die entsprechende Version des Spark HBase-Connectors verweist.

In der folgenden Tabelle werden beispielsweise zwei Versionen und die entsprechenden Befehle aufgelistet, die vom HDInsight-Team derzeit verwendet werden. Sie können die gleichen Versionen für Ihre Cluster verwenden, wenn die Versionen von HBase und Spark den in der Tabelle aufgeführten entsprechen.

  1. Geben Sie in Ihrer geöffneten SSH-Sitzung für den Spark-Cluster den folgenden Befehl ein, um eine Spark-Shell zu starten:

    Spark-Version HDI HBase-Version SHC-Version Get-Help
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Lassen Sie diese Instanz der Spark-Shell geöffnet, und fahren Sie mit dem Definieren von Katalog und Abfrage fort. Wenn Sie die JAR-Dateien nicht finden, die Ihren Versionen im SHC Core-Respository entsprechen, lesen Sie weiter.

Bei nachfolgenden Kombinationen von Spark- und HBase-Versionen werden diese Artefakte nicht mehr im obigen Repository veröffentlicht. Sie können die JAR-Dateien direkt über die GitHub-Verzweigung spark-hbase-connector erstellen. Führen Sie beispielsweise die folgenden Schritte aus, wenn Sie Spark 2.4 und HBase 2.1 verwenden:

  1. Klonen Sie das Repository:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Wechseln Sie zu „branch-2.4“:

    git checkout branch-2.4
    
  3. Nehmen Sie die Erstellung auf der Grundlage des Branch vor (Erstellung einer JAR-Datei):

    mvn clean package -DskipTests
    
  4. Führen Sie den folgenden Befehl aus. (Ändern Sie unbedingt den JAR-Namen, der der von Ihnen erstellten JAR-Datei entspricht.)

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Lassen Sie diese Instanz der Spark-Shell geöffnet, und fahren Sie mit dem nächsten Abschnitt fort.

Definieren von Katalog und Abfrage

In diesem Schritt definieren Sie ein Katalogobjekt, das das Apache Spark-Schema Apache HBase zuordnet.

  1. Geben Sie in Ihrer geöffneten Spark-Shell die folgenden import-Anweisungen ein:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Geben Sie den folgenden Befehl ein, um einen Katalog für die Tabelle „Kontakte“ zu definieren, die Sie in HBase erstellt haben:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Der Code:

    1. definiert ein Katalogschema für die HBase-Tabelle namens Contacts.
    2. identifiziert das rowkey-Element als key und ordnet die in Spark verwendeten Spaltennamen der Spaltenfamilie, dem Spaltennamen und dem Spaltentyp zu, die in HBase verwendet werden.
    3. definiert den rowkey im Detail als benannte Spalte (rowkey), die über eine spezifische cf-Spaltenfamilie rowkey verfügt.
  3. Geben Sie den folgenden Befehl ein, um eine Methode zu definieren, die einen Datenrahmen um Ihre Contacts-Tabelle in HBase bereitstellt:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Erstellen Sie eine Instanz des Datenrahmens:

    val df = withCatalog(catalog)
    
  5. Fragen Sie den Datenrahmen ab:

    df.show()
    

    Es sollten zwei Zeilen mit Daten angezeigt werden:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registrieren Sie eine temporäre Tabelle, sodass Sie die HBase-Tabelle mithilfe von Spark SQL abfragen können:

    df.createTempView("contacts")
    
  7. Erstellen Sie eine SQL-Abfrage für die contacts-Tabelle:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Die Ergebnisse sollten wie folgt aussehen:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Einfügen neuer Daten

  1. Um einen neuen Kontaktdatensatz einzufügen, definieren Sie eine ContactRecord-Klasse:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Erstellen Sie eine Instanz von ContactRecord, und fügen Sie sie in ein Array ein:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Speichern Sie das Array mit den neuen Daten in HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Untersuchen Sie die Ergebnisse:

    df.show()
    

    Es sollte in etwa folgende Ausgabe angezeigt werden:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Geben Sie den folgenden Befehl ein, um die Spark-Shell zu schließen:

    :q
    

Nächste Schritte