Uso de Apache Spark para leer y escribir datos de Apache HBase
Apache HBase se consulta, normalmente, con su API de bajo nivel (instrucciones scan, get y put) o con una sintaxis SQL que utiliza Apache Phoenix. Apache también proporciona el conector HBase de Apache Spark. Este es una alternativa práctica y eficaz para consultar y modificar los datos que almacena HBase.
Requisitos previos
Dos clústeres de HDInsight independientes implementados en la misma red virtual. HBase y Spark con al menos Spark 2.1 (HDInsight 3.6) instalado. Para obtener más información, consulte Crear clústeres basados en Linux en HDInsight con Azure Portal.
El esquema de URI para el almacenamiento principal de clústeres. Este esquema sería wasb:// para Azure Blob Storage,
abfs://
para Azure Data Lake Storage Gen2 o adl:// para Azure Data Lake Storage Gen1. Si se habilita la transferencia segura para Blob Storage, el identificador URI seríawasbs://
. Consulte también el artículo acerca de la transferencia segura.
Proceso general
El proceso de alto nivel para habilitar el clúster de Spark para consultar el clúster de HBase es el siguiente:
- Prepare algunos datos de ejemplo en HBase.
- Obtenga el archivo hbase-site.xml de la carpeta de configuración del clúster de HBase (/etc/hbase/conf) y coloque una copia de hbase-site.xml en la carpeta de configuración de Spark 2 (/etc/spark2/conf). (OPCIONAL: usar el script proporcionado por el equipo de HDInsight para automatizar este proceso)
- Ejecute
spark-shell
haciendo referencia al conector Spark HBase por sus coordenadas Maven en la opciónpackages
. - Defina un catálogo que asigne el esquema de Spark a HBase.
- Interactúe con los datos de HBase mediante las API de DataFrame o RDD.
Preparación de datos de ejemplo en Apache HBase
En este paso, se crea y rellena una tabla en Apache HBase que, después, se podrá consultar mediante Spark.
Use el comando
ssh
para conectarse al clúster de HBase. Modifique el comando. Para ello, reemplaceHBASECLUSTER
por el nombre del clúster de Hbase y, luego, escriba el comando:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
Use el comando
hbase shell
para iniciar el shell interactivo de HBase. Escriba el siguiente comando en la conexión SSH:hbase shell
Use el comando
create
para crear una tabla de HBase con dos familias de columnas. Escriba el comando siguiente:create 'Contacts', 'Personal', 'Office'
Use el comando
put
para insertar valores en una columna especificada de una fila concreta de una tabla determinada. Escriba el comando siguiente:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
Use el comando
exit
para detener el shell interactivo de HBase. Escriba el comando siguiente:exit
Ejecución de scripts para configurar la conexión entre clústeres
Para configurar la comunicación entre los clústeres, siga los pasos para ejecutar dos scripts en los clústeres. Estos scripts automatizarán el proceso de copia de archivos descrito en la sección "Configuración manual de la comunicación".
- El script que ejecute desde el clúster de HBase cargará
hbase-site.xml
y la información de asignación de IP de HBase en el almacenamiento predeterminado asociado al clúster de Spark. - El script que ejecute desde el clúster de Spark configurará dos trabajos de cron para ejecutar dos scripts auxiliares periódicamente:
- Trabajo de cron de HBase: descarga nuevos archivos
hbase-site.xml
y la asignación de IP de HBase de la cuenta de almacenamiento predeterminada de Spark al nodo local - Trabajo de cron de Spark: comprueba si se ha producido un escalado de Spark y si el clúster es seguro. Si es así, edite
/etc/hosts
para incluir la asignación de IP de HBase almacenada localmente
- Trabajo de cron de HBase: descarga nuevos archivos
NOTA: Antes de continuar, asegúrese de que ha agregado la cuenta de almacenamiento del clúster de Spark a su clúster de HBase como cuenta de almacenamiento secundaria. Asegúrese de que los scripts estén ordenados como se indica.
Use Acción de script en el clúster de HBase para aplicar los cambios con las siguientes consideraciones:
Propiedad Value URI de script de Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Tipos de nodo Region Parámetros -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Guardado sí SECONDARYS_STORAGE_URL
es la dirección URL del almacenamiento predeterminado del lado de Spark. Ejemplo de parámetro:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Use Acción de script en el clúster de Spark para aplicar los cambios con las siguientes consideraciones:
Propiedad Value URI de script de Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Tipos de nodo Head, Worker, Zookeeper Parámetros -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Guardado sí - Puede especificar la frecuencia con la que desee que este clúster compruebe automáticamente si actualizarse. Valor predeterminado: -s “*/1 * * * *” -h 0 (en este ejemplo, el trabajo de cron de Spark se ejecuta cada minuto, mientras que el trabajo de cron de HBase no se ejecuta)
- Dado que el trabajo de cron de HBase no se configura de forma predeterminada, debe volver a ejecutar este script cuando realice el escalado a su clúster de HBase. Si el clúster de HBase se escala a menudo, puede optar por configurar el trabajo de cron de HBase automáticamente. Por ejemplo:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
configura el script para realizar comprobaciones cada 30 minutos. Esto ejecutará la programación de cron de HBase periódicamente para automatizar la descarga de nueva información de HBase de la cuenta de almacenamiento común al nodo local.
Nota:
Estos scripts solo funcionan en clústeres HDI 5.0 y HDI 5.1.
Configuración manual de la comunicación (opcional, si se produce un error en el script proporcionado en el paso anterior)
NOTA: Estos pasos deben realizarse cada vez que se realiza una actividad de escalado en uno de los clústeres.
Copie hbase-site.xml del almacenamiento local a la raíz del almacenamiento predeterminado del clúster de Spark. Edite el comando para reflejar la configuración. A continuación, en la sesión SSH abierta para el clúster de HBase, escriba el comando:
Valor de sintaxis Valor nuevo Esquema de URI Modifíquelo para reflejar su almacenamiento. La sintaxis es para el almacenamiento de blobs con transferencia segura habilitada. SPARK_STORAGE_CONTAINER
Reemplácelo con el nombre del contenedor de almacenamiento predeterminado que se usa para el clúster de Spark. SPARK_STORAGE_ACCOUNT
Reemplácelo por el nombre predeterminado de la cuenta de almacenamiento que se usa para el clúster de Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Luego, salga de su conexión ssh con el clúster de HBase.
exit
Conéctese al nodo principal del clúster Spark mediante SSH. Modifique el comando al reemplazar
SPARKCLUSTER
por el nombre del clúster de Spark y luego escriba el comando:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Escriba el comando para copiar
hbase-site.xml
del almacenamiento predeterminado del clúster de Spark en la carpeta de configuración de Spark 2 en el almacenamiento local del clúster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Ejecutar el shell Spark haciendo referencia al conector Spark HBase
Después de completar el paso anterior, debería poder ejecutar el shell de Spark con referencia a la versión adecuada del conector Spark HBase.
Como ejemplo, en la tabla siguiente se muestran dos versiones y los comandos correspondientes que el equipo de HDInsight usa actualmente. Si las versiones de HBase y Spark son las mismas que las indicadas en la tabla, puede usar las mismas versiones de los clústeres.
En la sesión SSH que tiene abierta del clúster de Spark, escriba el siguiente comando para iniciar un shell de Spark:
Versión de Spark Versión de HDI de HBase Versión de SHC Get-Help 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Mantenga abierta esta instancia de shell de Spark y continúe con Definir un catálogo y una consulta. Si no encuentra los archivos JAR correspondientes a sus versiones en el repositorio de SHC Core, siga leyendo.
Con las combinaciones posteriores de las versiones de Spark y HBase, estos artefactos ya no se publican en el repositorio anterior. Puede compilar los archivos JAR directamente desde la rama GitHub de Spark-HBase-Connector. Por ejemplo, si ejecuta Spark 2.4 y HBase 2.1, siga estos pasos:
Clonación del repositorio:
git clone https://github.com/hortonworks-spark/shc
Vaya a branch-2.4:
git checkout branch-2.4
Compilar a partir de la rama (crea un archivo .jar):
mvn clean package -DskipTests
Ejecute el siguiente comando (asegúrese de cambiar el nombre .jar que corresponde al archivo .jar que ha creado):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Mantenga abierta esta instancia de shell de Spark y continúe con el paso siguiente.
Definir un catálogo y una consulta
En este paso, se define un objeto de catálogo que asigna el esquema de Apache Spark a Apache HBase.
En el shell abierto de Spark, ejecute las siguientes instrucciones
import
:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Escriba el comando siguiente para definir un catálogo para la tabla Contactos que creó en HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
El código:
- Define un esquema de catálogo para la tabla HBase denominada
Contacts
. - Identifica el objeto rowkey como
key
y asigna los nombres de columna usados en Spark a la familia de columna, el nombre de columna y el tipo de columna tal como se usa en HBase. - Define el objeto rowkey en detalle como una columna con nombre (
rowkey
), que tiene una familia de columna específicacf
derowkey
.
- Define un esquema de catálogo para la tabla HBase denominada
Escriba el comando para definir un método que proporcione un elemento DataFrame alrededor de su tabla
Contacts
en HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Cree una instancia de DataFrame:
val df = withCatalog(catalog)
Consulte el elemento DataFrame:
df.show()
Verá dos filas de datos:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Registre una tabla temporal para poder consultar la tabla HBase mediante Spark SQL:
df.createTempView("contacts")
Emita una consulta SQL en la tabla
contacts
:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Verá resultados parecidos a los siguientes:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Insertar nuevos datos
Para insertar un nuevo registro de contacto, defina una clase
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Cree una instancia de
ContactRecord
y colóquela en una matriz:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Guarde la matriz de nuevos datos en HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Examine los resultados:
df.show()
La salida debería ser parecida a la que se muestra a continuación:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Para cerrar el shell de Spark, escriba el comando siguiente:
:q