Usare Apache Spark per leggere e scrivere dati Apache HBase
Le query in Apache HBase vengono in genere eseguite con l'API di basso livello corrispondente (scan, get e put) o con una sintassi SQL tramite Apache Phoenix. Apache fornisce anche il connettore Apache Spark HBase. Il connettore è un'alternativa conveniente ed efficiente per eseguire query e modificare i dati archiviati da HBase.
Prerequisiti
Due cluster HDInsight separati distribuiti nella stessa rete virtuale. Una HBase e una Spark con almeno Spark 2.1 (HDInsight 3.6) installata. Per altre informazioni, vedere Creare cluster basati su Linux in HDInsight tramite il portale di Azure.
Lo schema URI per l'archiviazione primaria dei cluster. Questo schema sarebbe wasb:// per Archiviazione BLOB di Azure,
abfs://
per Azure Data Lake Storage Gen2 o adl:// per Azure Data Lake Storage Gen1. Se il trasferimento sicuro è abilitato per l'archiviazione BLOB, l'URI saràwasbs://
. Vedere anche l'articolo sul trasferimento sicuro.
Processo generale
Il processo generale per abilitare il cluster Spark per eseguire query sul cluster HBase è il seguente:
- Preparare alcuni dati di esempio in HBase.
- Acquisire il file hbase-site.xml dalla cartella di configurazione del cluster HBase (/etc/hbase/conf) e inserire una copia di hbase-site.xml nella cartella di configurazione di Spark 2 (/etc/spark2/conf). (FACOLTATIVO: usare lo script fornito dal team DI HDInsight per automatizzare questo processo)
- Eseguire
spark-shell
facendo riferimento al connettore HBase Spark dalle relative coordinate Maven nell'opzionepackages
. - Definire un catalogo corrispondente allo schema da Spark a HBase.
- Interagire con i dati di HBase tramite le API RDD o DataFrame.
Preparare i dati di esempio in Apache HBase
In questo passaggio viene creata e popolata una tabella in Apache HBase che è quindi possibile eseguire query usando Spark.
Usare il
ssh
comando per connettersi al cluster HBase. Modificare il comando sostituendoHBASECLUSTER
con il nome del cluster HBase e quindi immettere il comando:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
Usare il comando per avviare la
hbase shell
shell interattiva HBase. Immettere il comando seguente nella connessione SSH:hbase shell
Usare il
create
comando per creare una tabella HBase con famiglie a due colonne. Immettere il comando seguente:create 'Contacts', 'Personal', 'Office'
Usare il
put
comando per inserire valori in una colonna specificata in una riga specificata in una determinata tabella. Immettere il comando seguente:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
Usare il comando per arrestare la
exit
shell interattiva HBase. Immettere il comando seguente:exit
Eseguire script per configurare la connessione tra cluster
Per configurare la comunicazione tra cluster, seguire la procedura per eseguire due script nei cluster. Questi script automatizzano il processo di copia dei file descritto nella sezione "Configurare manualmente la comunicazione".
- Lo script eseguito dal cluster HBase caricherà
hbase-site.xml
e le informazioni sul mapping IP di HBase all'archiviazione predefinita collegata al cluster Spark. - Lo script eseguito dal cluster Spark configura due processi cron per eseguire periodicamente due script helper:
- Processo HBase cron: scaricare nuovi
hbase-site.xml
file e mapping IP HBase dall'account di archiviazione predefinito spark al nodo locale - Processo spark cron: verifica se si è verificato un ridimensionamento spark e se il cluster è sicuro. In tal caso, modificare
/etc/hosts
per includere il mapping IP HBase archiviato in locale
- Processo HBase cron: scaricare nuovi
NOTA: prima di procedere, assicurarsi di aver aggiunto l'account di archiviazione del cluster Spark al cluster HBase come account di archiviazione secondario. Assicurarsi che gli script siano ordinati come indicato.
Usare l'azione script nel cluster HBase per applicare le modifiche con le considerazioni seguenti:
Proprietà Valore URI script Bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
Tipo/i di nodo Region Parametri -s SECONDARYS_STORAGE_URL
Persisted sì -
SECONDARYS_STORAGE_URL
è l'URL dell'archiviazione predefinita sul lato Spark. Esempio di parametro:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
-
Usare l'azione script nel cluster Spark per applicare le modifiche con le considerazioni seguenti:
Proprietà Valore URI script Bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
Tipo/i di nodo Head, Worker, Zookeeper Parametri -s "SPARK-CRON-SCHEDULE"
(facoltativo)-h "HBASE-CRON-SCHEDULE"
(facoltativo)Persisted sì - È possibile specificare la frequenza con cui si vuole che questo cluster verifichi automaticamente se l'aggiornamento. Impostazione predefinita: -s "*/1 * * " -h 0 (In questo esempio, spark cron viene eseguito ogni minuto, mentre il cron HBase non viene eseguito)
- Poiché HBase cron non è configurato per impostazione predefinita, è necessario eseguire nuovamente questo script durante l'esecuzione del ridimensionamento nel cluster HBase. Se spesso il cluster HBase viene ridimensionato, è possibile scegliere di configurare automaticamente il processo HBase cron. Ad esempio:
-h "*/30 * * * *"
configura lo script per eseguire controlli ogni 30 minuti. Questa operazione eseguirà periodicamente la pianificazione cron di HBase per automatizzare il download delle nuove informazioni HBase sull'account di archiviazione comune nel nodo locale.
Configurare manualmente la comunicazione (facoltativo, se specificato script nel passaggio precedente ha esito negativo)
NOTA: Questi passaggi devono eseguire ogni volta che uno dei cluster subisce un'attività di ridimensionamento.
Copiare il hbase-site.xml dall'archiviazione locale alla radice dell'archiviazione predefinita del cluster Spark. Modificare il comando per riflettere la configurazione. Quindi, dalla sessione SSH aperta al cluster HBase immettere il comando:
Valore della sintassi Nuovo valore Schema URI Modificare per riflettere l'archiviazione. La sintassi è per l'archiviazione BLOB con trasferimento sicuro abilitato. SPARK_STORAGE_CONTAINER
Sostituire con il nome del contenitore di archiviazione predefinito usato per il cluster Spark. SPARK_STORAGE_ACCOUNT
Sostituire con il nome dell'account di archiviazione predefinito usato per il cluster Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Uscire quindi dalla connessione SSH al cluster HBase.
exit
Connettersi al nodo head del cluster Spark tramite SSH. Modificare il comando sostituendo
SPARKCLUSTER
con il nome del cluster Spark e quindi immettere il comando:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Immettere il comando da copiare
hbase-site.xml
dall'archiviazione predefinita del cluster Spark nella cartella di configurazione di Spark 2 nell'archiviazione locale del cluster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Eseguire la shell di Spark facendo riferimento al connettore HBase Spark
Dopo aver completato il passaggio precedente, è necessario poter eseguire la shell Spark, facendo riferimento alla versione appropriata di Spark HBase Connector.
Ad esempio, la tabella seguente elenca due versioni e i comandi corrispondenti usati dal team HDInsight. È possibile usare le stesse versioni per i cluster se le versioni di HBase e Spark sono uguali a quanto indicato nella tabella.
Nella sessione SSH aperta al cluster Spark immettere il comando seguente per avviare una shell Spark:
Versione di Spark Versione di HDI HBase Versione di SHC Comando 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Mantenere aperta l'istanza della shell Spark e continuare a Definire un catalogo e una query. Se non si trovano i file JAR corrispondenti alle versioni nel repository SHC Core, continuare la lettura.
Per le combinazioni successive di versioni di Spark e HBase, questi artefatti non vengono più pubblicati nel repository precedente. È possibile compilare i file JAR direttamente dal ramo GitHub spark-hbase-connector . Ad esempio, se si esegue con Spark 2.4 e HBase 2.1, completare questi passaggi:
Clonare il repository:
git clone https://github.com/hortonworks-spark/shc
Passare a branch-2.4:
git checkout branch-2.4
Compilazione dal ramo (crea un file con estensione jar):
mvn clean package -DskipTests
Eseguire il comando seguente (assicurarsi di modificare il nome con estensione jar corrispondente al file con estensione jar creato):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Mantenere aperta l'istanza della shell Spark e continuare con la sezione successiva.
Definire un catalogo e una query
In questo passaggio, definire un oggetto catalogo corrispondente allo schema da Apache Spark ad Apache HBase.
Nella shell Spark aperta immettere le istruzioni seguenti
import
:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Immettere il comando seguente per definire un catalogo per la tabella Contatti creata in HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Il codice:
- Definisce uno schema del catalogo per la tabella HBase denominata
Contacts
. - Identifica la chiave di riga come
key
e esegue il mapping dei nomi di colonna usati in Spark alla famiglia di colonne, al nome della colonna e al tipo di colonna usato in HBase. - Definisce la chiave di riga in dettaglio come colonna denominata (
rowkey
), con una famigliacf
di colonne specifica dirowkey
.
- Definisce uno schema del catalogo per la tabella HBase denominata
Immettere il comando per definire un metodo che fornisce un dataframe intorno alla
Contacts
tabella in HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Creare un'istanza del DataFrame:
val df = withCatalog(catalog)
Recuperare il DataFrame:
df.show()
Dovrebbero essere visualizzate due righe di dati:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Registrare una tabella temporanea in modo che sia possibile eseguire query nella tabella HBase tramite Spark SQL:
df.createTempView("contacts")
Eseguire una query SQL sulla tabella
contacts
:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Dovrebbero essere visualizzati risultati simili ai seguenti:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Inserire nuovi dati
Per inserire un nuovo record di contatto, definire una classe
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Creare un'istanza di
ContactRecord
e inserirlo in una matrice:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Salvare la matrice dei nuovi dati in HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Esaminare i risultati:
df.show()
Verrà visualizzato un output simile al seguente:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Chiudere la shell spark immettendo il comando seguente:
:q