Usar o Apache Spark para ler e gravar dados do Apache HBase

Apache HBase costuma ser consultada com sua API de nível inferior (verificações, obtenções e colocações) ou com uma sintaxe SQL usando Apache Phoenix. O Apache também fornece o Apache Spark HBase Connector. O Connector é uma alternativa prática e eficiente para consultar e modificar dados armazenados pelo HBase.

Pré-requisitos

  • Dois clusters HDInsight separados implantados na mesma rede virtual. Um HBase e também um Spark com pelo menos o Spark 2.1 (HDInsight 3.6) instalado. Para obter mais informações, consulte Criar clusters baseados em Linux no HDInsight usando o portal do Azure.

  • O esquema de URI do seu armazenamento primário de clusters. Esse esquema seria wasb:// para o Armazenamento de Blobs do Azure, abfs:// para o Azure Data Lake Storage Gen2 ou adl:// para o Azure Data Lake Storage Gen1. Se a transferência segura estiver habilitada para o Armazenamento de Blobs, o URI será wasbs://. Confira também Transferência segura.

Processo geral

Este é o processo de alto nível que permite que o cluster Spark consulte o cluster HDInsight:

  1. Preparar alguns dados de exemplo no HBase.
  2. Obtenha o arquivo hbase-site.xml na pasta de configuração de cluster do HBase (/etc/hbase/conf) e coloque uma cópia dele na pasta de configuração do Spark 2 (/etc/spark2/conf). Opcional: use o script fornecido pela equipe do HDInsight para automatizar esse processo.
  3. Execute spark-shell referenciar o conector do HBase Spark por seu Maven coordena na opção packages.
  4. Defina um catálogo que mapeia o esquema do Spark para HBase.
  5. Interagir com os dados do HBase usando as APIs de DataFrame ou RDD.

Preparar os dados de exemplo no Apache HBase

Nesta etapa, você pode criar e preencher uma tabela no Apache HBase que pode ser consultada com o Spark.

  1. Use o comando ssh para se conectar ao cluster HBase. Edite o comando substituindo HBASECLUSTER pelo nome do seu cluster HBase e insira o comando:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Use o comando hbase shell para iniciar o shell interativo do HBase. Digite o seguinte comando em sua conexão de SSH:

    hbase shell
    
  3. Use o comando create para criar uma tabela do HBase com famílias de duas colunas. Insira o seguinte comando:

    create 'Contacts', 'Personal', 'Office'
    
  4. Use o comando put para inserir valores em coluna e linha específicas de uma determinada tabela. Insira o seguinte comando:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Use o comando exit para interromper o shell interativo do HBase. Insira o seguinte comando:

    exit
    

Executar scripts para configurar a conexão entre clusters

Para configurar a comunicação entre clusters, siga as etapas para executar dois scripts em seus clusters. Esses scripts automatizarão o processo de cópia de arquivo descrito na seção 'Configurar a comunicação manualmente'.

  • O script executado no cluster HBase carregará o arquivo hbase-site.xml e as informações de mapeamento de IP do HBase para o armazenamento padrão anexado ao cluster Spark.
  • O script executado no cluster Spark configura dois trabalhos cron para executar dois scripts auxiliares periodicamente:
    1. Trabalho cron do HBase – baixa novos arquivos hbase-site.xml e o mapeamento de IP do HBase da conta de armazenamento padrão do Spark para o nó local.
    2. Trabalho cron do Spark – verifica se ocorreu um dimensionamento do Spark e se o cluster é seguro. Nesse caso, edite /etc/hosts para incluir o mapeamento de IP do HBase armazenado localmente.

OBSERVAÇÃO: antes de prosseguir, verifique se você adicionou a conta de armazenamento do cluster Spark ao seu cluster HBase como conta de armazenamento secundária. Verifique se os scripts estão na ordem indicada.

  1. Use Ação de Script no cluster HBase para aplicar as alterações com as seguintes considerações:

    Propriedade Valor
    URI do script Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
    Tipo(s) de nó Região
    Parâmetros -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
    Persistente sim
    • SECONDARYS_STORAGE_URL é a URL do armazenamento padrão do lado do Spark. Exemplo de parâmetro: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
  2. Use Ação de Script no cluster Spark para aplicar as alterações com as seguintes considerações:

    Propriedade Valor
    URI do script Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
    Tipo(s) de nó Cabeçalho, Trabalho, Zookeeper
    Parâmetros -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
    Persistente sim
    • Você pode especificar a frequência com que deseja que esse cluster verifique automaticamente se há atualizações. Padrão: -s “*/1 * * * *” -h 0 (neste exemplo, o cron Spark é executado a cada minuto, enquanto o cron HBase não é executado)
    • Como o cron do HBase não é configurado por padrão, você precisa executar novamente esse script ao executar o dimensionamento para o cluster do HBase. Se o cluster HBase for dimensionado com frequência, você poderá optar por configurar o trabalho cron HBase automaticamente. Por exemplo: -s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc" configura o script para fazer verificações a cada 30 minutos. Isso executará o cronograma do cron HBase periodicamente para automatizar o download de novas informações do HBase na conta de armazenamento comum para o nó local.

Observação

Esses scripts funcionam apenas em clusters HDI 5.0 e HDI 5.1.

Configurar a comunicação manualmente (opcional, caso ocorra falha no script fornecido na etapa acima)

OBSERVAÇÃO: é preciso executar estas etapas sempre que um dos clusters passa por uma atividade de dimensionamento.

  1. Copie o arquivo hbase-site.xml do armazenamento local para a raiz do armazenamento padrão do cluster Spark. Edite o comando para refletir sua configuração. Em seguida, na sessão SSH aberta para o cluster HBase, digite o comando:

    Valor da sintaxe Novo valor
    Esquema de URI Modifique para refletir o armazenamento. A sintaxe é para armazenamento de blob com transferência segura habilitada.
    SPARK_STORAGE_CONTAINER Substitua pelo nome do contêiner de armazenamento padrão usado para o cluster Spark.
    SPARK_STORAGE_ACCOUNT Substitua pelo nome da conta de armazenamento padrão usada para o cluster Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Em seguida, saia da conexão SSH para o cluster HBase.

    exit
    
  3. Conecte-se ao nó principal do cluster Spark usando o SSH. Edite o comando substituindo SPARKCLUSTER pelo nome do seu cluster Spark e insira o comando:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Insira o comando para copiar hbase-site.xml do armazenamento padrão do cluster Spark para a pasta de configuração do Spark 2 no armazenamento local do cluster:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Execute o Shell de Spark referenciando o conector HBase Spark

Depois de concluir a etapa anterior, você deve conseguir executar o shell do Spark, fazendo referência à versão apropriada do Spark HBase Connector.

Como exemplo, a tabela a seguir lista duas versões e os comandos correspondentes que a equipe do HDInsight usa atualmente. Você poderá usar versões idênticas para seus clusters caso as versões do HBase e do Spark sejam as mesmas indicadas na tabela.

  1. Na sessão SSH aberta para o cluster Spark, digite o seguinte comando para iniciar um shell do Spark:

    Versão do Spark Versão do HDI HBase Versão do SHC Comando
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Mantenha aberta essa instância do shell do Spark e continue para Definir um catálogo e uma consulta. Se você não encontrar os jars que correspondem às suas versões no repositório SHC Core, continue lendo.

Para combinações seguintes de versões do Spark e do HBase, esses artefatos não são mais publicados no repositório acima. É possível criar os jars diretamente do GitHub branch spark-hbase-connector. Por exemplo, se você estiver executando com Spark 2.4 e HBase 2.1, conclua estas etapas:

  1. Clone o repositório:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Vá para branch-2.4:

    git checkout branch-2.4
    
  3. Compile a partir do branch (cria um arquivo. jar):

    mvn clean package -DskipTests
    
  4. Execute o comando a seguir (lembre-se de alterar o nome. jar correspondente ao arquivo. jar que você criou):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Mantenha aberta essa instância do shell do Spark e continue para a próxima seção.

Definir um catálogo e uma consulta

Nesta etapa, você deve definir um catálogo que mapeia o esquema do Apache Spark para Apache HBase.

  1. No shell do Spark aberto, execute as seguintes instruções import:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Digite o comando a seguir para definir um catálogo para a tabela Contatos que você criou no HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    O código:

    1. Define um esquema de catálogo para a tabela do HBase chamada Contacts.
    2. Identifica rowkey como key e mapeia os nomes de coluna usados no Spark para a família de colunas, o nome da coluna e o tipo de coluna conforme usado no HBase.
    3. Define rowkey em detalhes como uma coluna nomeada (rowkey), que tem uma determinada família de colunas cf de rowkey.
  3. Digite o comando para definir um método que forneça um DataFrame em torno de sua tabela Contacts no HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Crie uma instância do DataFrame:

    val df = withCatalog(catalog)
    
  5. Consulte o DataFrame:

    df.show()
    

    Você deve ver duas linhas de dados:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registre uma tabela temporária para que você possa consultar a tabela do HBase usando Spark SQL:

    df.createTempView("contacts")
    
  7. Emitir uma consulta SQL em relação a contacts tabela:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Você deve ver os resultados como estes:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Inserir nova linha

  1. Para inserir um novo registro de contato, defina uma ContactRecord classe:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Crie uma instância de ContactRecord e colocá-la em uma matriz:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Salve a matriz de novos dados em HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Examine os resultados:

    df.show()
    

    Você deve ver uma saída como a abaixo:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Feche o shell do Spark digitando o seguinte comando:

    :q
    

Próximas etapas