Verwenden von Apache Spark zum Lesen und Schreiben von Apache HBase-Daten
Apache HBase wird üblicherweise über die Low-Level-API (scan-, get- und put-Abfragen) oder mit einer SQL-Syntax unter Verwendung von Apache Phoenix abgefragt. Apache bietet außerdem den Apache Spark HBase-Connector. Der Connector ist eine praktische und effiziente Alternative zum Abfragen und Ändern von Daten, die von HBase gespeichert wurden.
Voraussetzungen
Zwei separate HDInsight-Cluster, die im selben virtuellen Netzwerk bereitgestellt wurden: einen HBase-Cluster und einen Spark-Cluster mit Spark 2.1 (HDInsight 3.6) als Mindestversion. Weitere Informationen finden Sie unter Erstellen von Linux-basierten Clustern in HDInsight mithilfe des Azure-Portals.
Das URI-Schema für Ihren primären Clusterspeicher. Dieses Schema ist „wasb://“ für Azure Blob Storage, „
abfs://
“ für Azure Data Lake Storage Gen2 oder „adl://“ für Azure Data Lake Storage Gen1. Wenn die sichere Übertragung für Blob Storage aktiviert ist, lautet der URIwasbs://
. Siehe auch Vorschreiben einer sicheren Übertragung in Azure Storage.
Übersicht über den Prozess
Der allgemeine Prozess zum Aktivieren Ihres Spark-Clusters für die Abfrage Ihres HBase-Clusters sieht wie folgt aus:
- Bereiten Sie einige Beispieldaten in HBase vor.
- Rufen Sie die Datei „hbase-site.xml“ aus dem Konfigurationsordner Ihres HBase-Clusters (/etc/HBase/conf) ab, und speichern Sie eine Kopie der Datei „hbase-site.xml“ in Ihrem Spark 2-Konfigurationsordner (/etc/spark2/conf). (OPTIONAL: Verwenden Sie das vom HDInsight-Team bereitgestellte Skript, um diesen Vorgang zu automatisieren.)
- Führen Sie
spark-shell
aus, und verweisen Sie in der Optionpackages
mit den Maven-Koordinaten auf den Spark HBase-Connector. - Definieren Sie einen Katalog, der das Spark-Schema zu HBase zuordnet.
- Interagieren Sie entweder über die RDD- oder die Dataframe-APIs mit den HBase-Daten.
Vorbereiten von Beispieldaten in Apache HBase
In diesem Schritt erstellen Sie eine Tabelle in Apache HBase und füllen sie auf. Diese Tabelle können Sie dann mit Spark abfragen.
Verwenden Sie zum Herstellen der Verbindung mit Ihrem HBase-Cluster
ssh
. Bearbeiten Sie den Befehl, indem SieHBASECLUSTER
durch den Namen Ihres HBase-Clusters ersetzen, und geben Sie den Befehl dann ein:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
Verwenden Sie den Befehl
hbase shell
, um die interaktive HBase-Shell zu starten. Geben Sie den folgenden Befehl in Ihrer SSH-Verbindung ein:hbase shell
Verwenden Sie den Befehl
create
, um eine HBase-Tabelle mit zwei Spaltenfamilien zu erstellen. Geben Sie den folgenden Befehl ein:create 'Contacts', 'Personal', 'Office'
Verwenden Sie den Befehl
put
, um Werte in einer angegebenen Spalte einer angegebenen Zeile in einer bestimmten Tabelle einzufügen. Geben Sie den folgenden Befehl ein:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
Verwenden Sie den Befehl
exit
, um die interaktive HBase-Shell zu beenden. Geben Sie den folgenden Befehl ein:exit
Ausführen von Skripts zum Einrichten der Verbindung zwischen Clustern
Führen Sie zum Einrichten der Kommunikation zwischen den Clustern die folgenden Schritte durch, um zwei Skripts in Ihren Clustern auszuführen. Mit diesen Skripts wird der Dateikopiervorgang (siehe Beschreibung im Abschnitt „Manuelles Einrichten der Kommunikation“) automatisiert.
- Mit dem Skript, das Sie im HBase-Cluster ausführen, werden die Datei
hbase-site.xml
sowie HBase-IP-Zuordnungsinformationen in den Standardspeicher hochgeladen, der dem Spark-Cluster zugeordnet ist. - Mit dem im Spark-Cluster ausgeführten Skript werden zwei Cronjobs eingerichtet, die regelmäßig zwei Hilfsskripts ausführen:
- HBase-Cronjob: Herunterladen neuer Versionen der Datei
hbase-site.xml
und der HBase-IP-Zuordnung aus dem Spark-Standardspeicherkonto auf den lokalen Knoten. - Spark-Cronjob: Überprüfen, ob eine Spark-Skalierung durchgeführt wurde und der Cluster sicher ist. Wenn dies der Fall ist, bearbeiten Sie
/etc/hosts
, um die lokal gespeicherte HBase-IP-Zuordnung einzuschließen.
- HBase-Cronjob: Herunterladen neuer Versionen der Datei
HINWEIS: Bevor Sie den Vorgang fortsetzen, müssen Sie sicherstellen, dass Sie das Speicherkonto des Spark-Clusters als sekundäres Speicherkonto zum HBase-Cluster hinzugefügt haben. Achten Sie darauf, dass Sie die Skripts in der angegebenen Reihenfolge ausführen.
Verwenden Sie in Ihrem HBase-Cluster eine Skriptaktion, um die Änderungen unter folgenden Aspekten anzuwenden:
Eigenschaft Wert Bash-Skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Knotentyp(en) Region Parameter -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Persistent ja SECONDARYS_STORAGE_URL
ist der URL des Spark-Standardspeichers. Parameterbeispiel:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Verwenden Sie in Ihrem Spark-Cluster eine Skriptaktion, um die Änderungen unter folgenden Aspekten anzuwenden:
Eigenschaft Wert Bash-Skript-URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Knotentyp(en) Head, Worker, Zookeeper Parameter -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Persistent ja - Sie können angeben, wie oft dieser Cluster automatisch auf Updates überprüfen soll. Standard: -s “*/1 * * * *” -h 0 (In diesem Beispiel wird der Spark-Cronjob jede Minute und der HBase-Cronjob überhaupt nicht ausgeführt)
- Da der HBase-Cron standardmäßig nicht eingerichtet ist, müssen Sie dieses Skript erneut ausführen, wenn Sie eine Skalierung Ihres HBase-Clusters vornehmen. Wenn Sie Ihren HBase-Cluster häufig skalieren, werden Sie sich wahrscheinlich für die automatische Einrichtung des HBase-Cronjobs entscheiden. Beispiel: Mit
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
wird das Skript so konfiguriert, dass alle 30 Minuten Überprüfungen durchgeführt werden. Dadurch wird der HBase-Cronjob regelmäßig nach Zeitplan ausgeführt, und das Herunterladen neuer HBase-Informationen aus dem allgemeinen Speicherkonto auf den lokalen Knoten wird automatisiert.
Hinweis
Diese Skripts funktionieren nur auf HDI 5.0- und HDI 5.1-Clustern.
Manuelles Einrichten der Kommunikation (optionale Vorgehensweise bei Fehlfunktion des oben genannten Skripts)
HINWEIS: Diese Schritte müssen jedes Mal ausgeführt werden, wenn für einen der Cluster ein Skalierungsvorgang durchgeführt wird.
Kopieren Sie die Datei „hbase-site.xml“ aus dem lokalen Speicher in das Stammverzeichnis des Standardspeichers Ihres Spark-Clusters. Bearbeiten Sie den Befehl, um ihn an Ihre Konfiguration anzupassen. Geben Sie anschließend in Ihrer geöffneten SSH-Sitzung diesen Befehl für den HBase-Cluster ein:
Syntaxwert Neuer Wert URI-Schema Nehmen Sie Änderungen zur Anpassung an Ihren Speicher vor. Die angegebene Syntax gilt für Blobspeicher mit aktivierter sicherer Übertragung. SPARK_STORAGE_CONTAINER
Fügen Sie den Namen Ihres Standardspeichercontainers ein, der für den Spark-Cluster verwendet wird. SPARK_STORAGE_ACCOUNT
Fügen Sie den Namen Ihres Standardspeicherkontos ein, der für den Spark-Cluster verwendet wird. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Trennen Sie dann die SSH-Verbindung mit Ihrem HBase-Cluster.
exit
Stellen Sie über SSH eine Verbindung mit dem Hauptknoten Ihres Spark-Clusters her. Bearbeiten Sie den Befehl, indem Sie
SPARKCLUSTER
durch den Namen Ihres Spark-Clusters ersetzen, und führen Sie dann den Befehl aus:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Geben Sie den folgenden Befehl ein, um
hbase-site.xml
aus dem Standardspeicher Ihres Spark-Clusters in den Spark 2-Konfigurationsordner im lokalen Speicher des Clusters zu kopieren:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Ausführen der Spark-Shell und Verweisen auf den Spark HBase-Connector
Nachdem Sie den vorherigen Schritt abgeschlossen haben, sollten Sie die Spark-Shell ausführen können, die auf die entsprechende Version des Spark HBase-Connectors verweist.
In der folgenden Tabelle werden beispielsweise zwei Versionen und die entsprechenden Befehle aufgelistet, die vom HDInsight-Team derzeit verwendet werden. Sie können die gleichen Versionen für Ihre Cluster verwenden, wenn die Versionen von HBase und Spark den in der Tabelle aufgeführten entsprechen.
Geben Sie in Ihrer geöffneten SSH-Sitzung für den Spark-Cluster den folgenden Befehl ein, um eine Spark-Shell zu starten:
Spark-Version HDI HBase-Version SHC-Version Get-Help 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Lassen Sie diese Instanz der Spark-Shell geöffnet, und fahren Sie mit dem Definieren von Katalog und Abfrage fort. Wenn Sie die JAR-Dateien nicht finden, die Ihren Versionen im SHC Core-Respository entsprechen, lesen Sie weiter.
Bei nachfolgenden Kombinationen von Spark- und HBase-Versionen werden diese Artefakte nicht mehr im obigen Repository veröffentlicht. Sie können die JAR-Dateien direkt über die GitHub-Verzweigung spark-hbase-connector erstellen. Führen Sie beispielsweise die folgenden Schritte aus, wenn Sie Spark 2.4 und HBase 2.1 verwenden:
Klonen Sie das Repository:
git clone https://github.com/hortonworks-spark/shc
Wechseln Sie zu „branch-2.4“:
git checkout branch-2.4
Nehmen Sie die Erstellung auf der Grundlage des Branch vor (Erstellung einer JAR-Datei):
mvn clean package -DskipTests
Führen Sie den folgenden Befehl aus. (Ändern Sie unbedingt den JAR-Namen, der der von Ihnen erstellten JAR-Datei entspricht.)
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Lassen Sie diese Instanz der Spark-Shell geöffnet, und fahren Sie mit dem nächsten Abschnitt fort.
Definieren von Katalog und Abfrage
In diesem Schritt definieren Sie ein Katalogobjekt, das das Apache Spark-Schema Apache HBase zuordnet.
Geben Sie in Ihrer geöffneten Spark-Shell die folgenden
import
-Anweisungen ein:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Geben Sie den folgenden Befehl ein, um einen Katalog für die Tabelle „Kontakte“ zu definieren, die Sie in HBase erstellt haben:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Der Code:
- definiert ein Katalogschema für die HBase-Tabelle namens
Contacts
. - identifiziert das rowkey-Element als
key
und ordnet die in Spark verwendeten Spaltennamen der Spaltenfamilie, dem Spaltennamen und dem Spaltentyp zu, die in HBase verwendet werden. - definiert den rowkey im Detail als benannte Spalte (
rowkey
), die über eine spezifischecf
-Spaltenfamilierowkey
verfügt.
- definiert ein Katalogschema für die HBase-Tabelle namens
Geben Sie den folgenden Befehl ein, um eine Methode zu definieren, die einen Datenrahmen um Ihre
Contacts
-Tabelle in HBase bereitstellt:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Erstellen Sie eine Instanz des Datenrahmens:
val df = withCatalog(catalog)
Fragen Sie den Datenrahmen ab:
df.show()
Es sollten zwei Zeilen mit Daten angezeigt werden:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Registrieren Sie eine temporäre Tabelle, sodass Sie die HBase-Tabelle mithilfe von Spark SQL abfragen können:
df.createTempView("contacts")
Erstellen Sie eine SQL-Abfrage für die
contacts
-Tabelle:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Die Ergebnisse sollten wie folgt aussehen:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Einfügen neuer Daten
Um einen neuen Kontaktdatensatz einzufügen, definieren Sie eine
ContactRecord
-Klasse:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Erstellen Sie eine Instanz von
ContactRecord
, und fügen Sie sie in ein Array ein:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Speichern Sie das Array mit den neuen Daten in HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Untersuchen Sie die Ergebnisse:
df.show()
Es sollte in etwa folgende Ausgabe angezeigt werden:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Geben Sie den folgenden Befehl ein, um die Spark-Shell zu schließen:
:q