Utilizar o Apache Spark para ler e escrever dados do Apache HBase
O Apache HBase normalmente é consultado com sua API de baixo nível (scans, gets, e puts) ou com uma sintaxe SQL usando o Apache Phoenix. O Apache também fornece o Apache Spark HBase Connector. O conector é uma alternativa conveniente e eficiente para consultar e modificar dados armazenados pelo HBase.
Pré-requisitos
Dois clusters HDInsight separados implantados na mesma rede virtual. Um HBase e um Spark com pelo menos o Spark 2.1 (HDInsight 3.6) instalado. Para obter mais informações, consulte Criar clusters baseados em Linux no HDInsight usando o portal do Azure.
O esquema de URI para o armazenamento primário de clusters. Esse esquema seria wasb:// para o Armazenamento de Blobs do Azure,
abfs://
para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento de Blob, o URI seráwasbs://
. Consulte também, transferência segura.
Processo geral
O processo de alto nível para permitir que o cluster do Spark consulte o cluster HBase é o seguinte:
- Prepare alguns dados de amostra no HBase.
- Adquira o arquivo hbase-site.xml da pasta de configuração do cluster HBase (/etc/hbase/conf) e coloque uma cópia do hbase-site.xml na pasta de configuração do Spark 2 (/etc/spark2/conf). (OPCIONAL: use o script fornecido pela equipe do HDInsight para automatizar esse processo)
- Execute
spark-shell
a referência ao Spark HBase Connector por suas coordenadas Maven napackages
opção. - Defina um catálogo que mapeie o esquema do Spark para o HBase.
- Interaja com os dados do HBase usando as APIs RDD ou DataFrame.
Preparar dados de exemplo no Apache HBase
Nesta etapa, você cria e preenche uma tabela no Apache HBase que pode ser consultada usando o Spark.
Use o
ssh
comando para se conectar ao cluster HBase. Edite o comando substituindoHBASECLUSTER
pelo nome do cluster HBase e digite o comando:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
Use o
hbase shell
comando para iniciar o shell interativo do HBase. Digite o seguinte comando em sua conexão SSH:hbase shell
Use o
create
comando para criar uma tabela HBase com famílias de duas colunas. Introduza o seguinte comando:create 'Contacts', 'Personal', 'Office'
Use o
put
comando para inserir valores em uma coluna especificada em uma linha especificada em uma tabela específica. Introduza o seguinte comando:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
Use o
exit
comando para parar o shell interativo do HBase. Introduza o seguinte comando:exit
Executar scripts para configurar a conexão entre clusters
Para configurar a comunicação entre clusters, siga as etapas para executar dois scripts em seus clusters. Esses scripts automatizarão o processo de cópia de arquivos descrito na seção 'Configurar a comunicação manualmente'.
- O script executado a partir do cluster HBase carregará
hbase-site.xml
as informações de mapeamento IP do HBase para o armazenamento padrão anexado ao cluster do Spark. - O script executado a partir do cluster do Spark configura dois trabalhos cron para executar dois scripts auxiliares periodicamente:
- HBase cron job – faça download de novos
hbase-site.xml
arquivos e mapeamento de IP do HBase da conta de armazenamento padrão do Spark para o nó local - Spark cron job – verifica se ocorreu um dimensionamento do Spark e se o cluster é seguro. Em caso afirmativo, edite
/etc/hosts
para incluir o mapeamento IP do HBase armazenado localmente
- HBase cron job – faça download de novos
NOTA: Antes de continuar, certifique-se de que adicionou a conta de armazenamento do cluster Spark ao cluster HBase como conta de armazenamento secundário. Certifique-se de que os scripts estão em ordem, conforme indicado.
Use a Ação de script no cluster do HBase para aplicar as alterações com as seguintes considerações:
Property valor Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Tipo(s) de nó(s) País/Região Parâmetros -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Persistiu sim SECONDARYS_STORAGE_URL
é a url do armazenamento padrão do lado do Spark. Exemplo de parâmetro:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Use a Ação de Script no cluster do Spark para aplicar as alterações com as seguintes considerações:
Property valor Bash script URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Tipo(s) de nó(s) Chefe, Trabalhador, Zookeeper Parâmetros -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Persistiu sim - Você pode especificar com que frequência deseja que esse cluster verifique automaticamente se está atualizado. Padrão: -s "*/1 * * * *" -h 0 (Neste exemplo, o cron do Spark é executado a cada minuto, enquanto o cron do HBase não é executado)
- Como o cron do HBase não está configurado por padrão, você precisa executar esse script novamente ao executar o dimensionamento para o cluster do HBase. Se o cluster do HBase for dimensionado com frequência, você poderá optar por configurar o trabalho cron do HBase automaticamente. Por exemplo:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
configura o script para executar verificações a cada 30 minutos. Isso executará o cronograma cron do HBase periodicamente para automatizar o download de novas informações do HBase na conta de armazenamento comum para o nó local.
Nota
Esses scripts funcionam apenas em clusters HDI 5.0 e HDI 5.1.
Configurar a comunicação manualmente (Opcional, se o script fornecido na etapa acima falhar)
NOTA: Estas etapas precisam ser executadas sempre que um dos clusters passa por uma atividade de dimensionamento.
Copie o hbase-site.xml do armazenamento local para a raiz do armazenamento padrão do cluster Spark. Edite o comando para refletir sua configuração. Em seguida, da sessão SSH aberta para o cluster HBase, digite o comando:
Valor da sintaxe Novo valor Esquema de URI Modifique para refletir seu armazenamento. A sintaxe é para armazenamento de blob com transferência segura habilitada. SPARK_STORAGE_CONTAINER
Substitua pelo nome do contêiner de armazenamento padrão usado para o cluster do Spark. SPARK_STORAGE_ACCOUNT
Substitua pelo nome da conta de armazenamento padrão usado para o cluster do Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Em seguida, saia da conexão ssh para o cluster HBase.
exit
Conecte-se ao nó principal do cluster do Spark usando SSH. Edite o comando substituindo
SPARKCLUSTER
pelo nome do cluster do Spark e digite o comando:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Insira o comando para copiar
hbase-site.xml
do armazenamento padrão do cluster Spark para a pasta de configuração do Spark 2 no armazenamento local do cluster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Execute o Spark Shell fazendo referência ao Spark HBase Connector
Depois de concluir a etapa anterior, você poderá executar o shell do Spark, fazendo referência à versão apropriada do Spark HBase Connector.
Como exemplo, a tabela a seguir lista duas versões e os comandos correspondentes que a equipe do HDInsight usa atualmente. Você pode usar as mesmas versões para seus clusters se as versões do HBase e do Spark forem as mesmas indicadas na tabela.
Na sessão SSH aberta para o cluster do Spark, digite o seguinte comando para iniciar um shell do Spark:
Versão do Spark HDI HBase versão Versão SHC Comando 2.1 IDH 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Mantenha essa instância do shell do Spark aberta e continue para Definir um catálogo e uma consulta. Se você não encontrar os jars que correspondem às suas versões no repositório SHC Core, continue lendo.
Para combinações subsequentes das versões Spark e HBase, esses artefatos não são mais publicados no repositório acima. Você pode construir os frascos diretamente da ramificação GitHub spark-hbase-connector . Por exemplo, se você estiver executando com o Spark 2.4 e o HBase 2.1, conclua estas etapas:
Clone o repo:
git clone https://github.com/hortonworks-spark/shc
Ir para ramo-2.4:
git checkout branch-2.4
Compilar a partir da ramificação (cria um arquivo .jar):
mvn clean package -DskipTests
Execute o seguinte comando (certifique-se de alterar o nome .jar que corresponde ao arquivo .jar que você criou):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Mantenha esta instância do shell do Spark aberta e continue para a próxima seção.
Definir um catálogo e uma consulta
Nesta etapa, você define um objeto de catálogo que mapeia o esquema do Apache Spark para o Apache HBase.
No Spark Shell aberto, insira as seguintes
import
instruções:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Digite o comando abaixo para definir um catálogo para a tabela Contatos que você criou no HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
O código:
- Define um esquema de catálogo para a tabela do HBase chamada
Contacts
. - Identifica a chave de linha como
key
, e mapeia os nomes de coluna usados no Spark para a família de colunas, o nome da coluna e o tipo de coluna usados no HBase. - Define a chave de linha em detalhes como uma coluna nomeada (
rowkey
), que tem uma famíliacf
de colunas específica derowkey
.
- Define um esquema de catálogo para a tabela do HBase chamada
Insira o comando para definir um método que forneça um DataFrame ao redor de sua
Contacts
tabela no HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Crie uma instância do DataFrame:
val df = withCatalog(catalog)
Consulte o DataFrame:
df.show()
Você verá duas linhas de dados:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Registre uma tabela temporária para que você possa consultar a tabela do HBase usando o Spark SQL:
df.createTempView("contacts")
Emita uma consulta SQL na
contacts
tabela:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Você deve ver resultados como estes:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Inserir novos dados
Para inserir um novo registro de contato, defina uma
ContactRecord
classe:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Crie uma instância de
ContactRecord
e coloque-a em uma matriz:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Salve a matriz de novos dados no HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Examine os resultados:
df.show()
Deverá ver um resultado como este:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Feche o shell de faísca digitando o seguinte comando:
:q