Usare Apache Spark per leggere e scrivere dati Apache HBase

Le query in Apache HBase vengono in genere eseguite con l'API di basso livello corrispondente (scan, get e put) o con una sintassi SQL tramite Apache Phoenix. Apache fornisce anche il connettore Apache Spark HBase. Il connettore è un'alternativa conveniente ed efficiente per eseguire query e modificare i dati archiviati da HBase.

Prerequisiti

  • Due cluster HDInsight separati distribuiti nella stessa rete virtuale. Una HBase e una Spark con almeno Spark 2.1 (HDInsight 3.6) installata. Per altre informazioni, vedere Creare cluster basati su Linux in HDInsight tramite il portale di Azure.

  • Lo schema URI per l'archiviazione primaria dei cluster. Questo schema sarebbe wasb:// per Archiviazione BLOB di Azure, abfs:// per Azure Data Lake Storage Gen2 o adl:// per Azure Data Lake Storage Gen1. Se il trasferimento sicuro è abilitato per l'archiviazione BLOB, l'URI sarà wasbs://. Vedere anche l'articolo sul trasferimento sicuro.

Processo generale

Il processo generale per abilitare il cluster Spark per eseguire query sul cluster HBase è il seguente:

  1. Preparare alcuni dati di esempio in HBase.
  2. Acquisire il file hbase-site.xml dalla cartella di configurazione del cluster HBase (/etc/hbase/conf) e inserire una copia di hbase-site.xml nella cartella di configurazione di Spark 2 (/etc/spark2/conf). (FACOLTATIVO: usare lo script fornito dal team DI HDInsight per automatizzare questo processo)
  3. Eseguire spark-shell facendo riferimento al connettore HBase Spark dalle relative coordinate Maven nell'opzione packages.
  4. Definire un catalogo corrispondente allo schema da Spark a HBase.
  5. Interagire con i dati di HBase tramite le API RDD o DataFrame.

Preparare i dati di esempio in Apache HBase

In questo passaggio viene creata e popolata una tabella in Apache HBase che è quindi possibile eseguire query usando Spark.

  1. Usare il ssh comando per connettersi al cluster HBase. Modificare il comando sostituendo HBASECLUSTER con il nome del cluster HBase e quindi immettere il comando:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. Usare il comando per avviare la hbase shell shell interattiva HBase. Immettere il comando seguente nella connessione SSH:

    hbase shell
    
  3. Usare il create comando per creare una tabella HBase con famiglie a due colonne. Immettere il comando seguente:

    create 'Contacts', 'Personal', 'Office'
    
  4. Usare il put comando per inserire valori in una colonna specificata in una riga specificata in una determinata tabella. Immettere il comando seguente:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. Usare il comando per arrestare la exit shell interattiva HBase. Immettere il comando seguente:

    exit
    

Eseguire script per configurare la connessione tra cluster

Per configurare la comunicazione tra cluster, seguire la procedura per eseguire due script nei cluster. Questi script automatizzano il processo di copia dei file descritto nella sezione "Configurare manualmente la comunicazione".

  • Lo script eseguito dal cluster HBase caricherà hbase-site.xml e le informazioni sul mapping IP di HBase all'archiviazione predefinita collegata al cluster Spark.
  • Lo script eseguito dal cluster Spark configura due processi cron per eseguire periodicamente due script helper:
    1. Processo HBase cron: scaricare nuovi hbase-site.xml file e mapping IP HBase dall'account di archiviazione predefinito spark al nodo locale
    2. Processo spark cron: verifica se si è verificato un ridimensionamento spark e se il cluster è sicuro. In tal caso, modificare /etc/hosts per includere il mapping IP HBase archiviato in locale

NOTA: prima di procedere, assicurarsi di aver aggiunto l'account di archiviazione del cluster Spark al cluster HBase come account di archiviazione secondario. Assicurarsi che gli script siano ordinati come indicato.

  1. Usare l'azione script nel cluster HBase per applicare le modifiche con le considerazioni seguenti:

    Proprietà Valore
    URI script Bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    Tipo/i di nodo Region
    Parametri -s SECONDARYS_STORAGE_URL
    Persisted
    • SECONDARYS_STORAGE_URL è l'URL dell'archiviazione predefinita sul lato Spark. Esempio di parametro: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. Usare l'azione script nel cluster Spark per applicare le modifiche con le considerazioni seguenti:

    Proprietà Valore
    URI script Bash https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    Tipo/i di nodo Head, Worker, Zookeeper
    Parametri -s "SPARK-CRON-SCHEDULE" (facoltativo) -h "HBASE-CRON-SCHEDULE" (facoltativo)
    Persisted
    • È possibile specificare la frequenza con cui si vuole che questo cluster verifichi automaticamente se l'aggiornamento. Impostazione predefinita: -s "*/1 * * " -h 0 (In questo esempio, spark cron viene eseguito ogni minuto, mentre il cron HBase non viene eseguito)
    • Poiché HBase cron non è configurato per impostazione predefinita, è necessario eseguire nuovamente questo script durante l'esecuzione del ridimensionamento nel cluster HBase. Se spesso il cluster HBase viene ridimensionato, è possibile scegliere di configurare automaticamente il processo HBase cron. Ad esempio: -h "*/30 * * * *" configura lo script per eseguire controlli ogni 30 minuti. Questa operazione eseguirà periodicamente la pianificazione cron di HBase per automatizzare il download delle nuove informazioni HBase sull'account di archiviazione comune nel nodo locale.

Configurare manualmente la comunicazione (facoltativo, se specificato script nel passaggio precedente ha esito negativo)

NOTA: Questi passaggi devono eseguire ogni volta che uno dei cluster subisce un'attività di ridimensionamento.

  1. Copiare il hbase-site.xml dall'archiviazione locale alla radice dell'archiviazione predefinita del cluster Spark. Modificare il comando per riflettere la configurazione. Quindi, dalla sessione SSH aperta al cluster HBase immettere il comando:

    Valore della sintassi Nuovo valore
    Schema URI Modificare per riflettere l'archiviazione. La sintassi è per l'archiviazione BLOB con trasferimento sicuro abilitato.
    SPARK_STORAGE_CONTAINER Sostituire con il nome del contenitore di archiviazione predefinito usato per il cluster Spark.
    SPARK_STORAGE_ACCOUNT Sostituire con il nome dell'account di archiviazione predefinito usato per il cluster Spark.
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. Uscire quindi dalla connessione SSH al cluster HBase.

    exit
    
  3. Connettersi al nodo head del cluster Spark tramite SSH. Modificare il comando sostituendo SPARKCLUSTER con il nome del cluster Spark e quindi immettere il comando:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. Immettere il comando da copiare hbase-site.xml dall'archiviazione predefinita del cluster Spark nella cartella di configurazione di Spark 2 nell'archiviazione locale del cluster:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

Eseguire la shell di Spark facendo riferimento al connettore HBase Spark

Dopo aver completato il passaggio precedente, è necessario poter eseguire la shell Spark, facendo riferimento alla versione appropriata di Spark HBase Connector.

Ad esempio, la tabella seguente elenca due versioni e i comandi corrispondenti usati dal team HDInsight. È possibile usare le stesse versioni per i cluster se le versioni di HBase e Spark sono uguali a quanto indicato nella tabella.

  1. Nella sessione SSH aperta al cluster Spark immettere il comando seguente per avviare una shell Spark:

    Versione di Spark Versione di HDI HBase Versione di SHC Comando
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. Mantenere aperta l'istanza della shell Spark e continuare a Definire un catalogo e una query. Se non si trovano i file JAR corrispondenti alle versioni nel repository SHC Core, continuare la lettura.

Per le combinazioni successive di versioni di Spark e HBase, questi artefatti non vengono più pubblicati nel repository precedente. È possibile compilare i file JAR direttamente dal ramo GitHub spark-hbase-connector . Ad esempio, se si esegue con Spark 2.4 e HBase 2.1, completare questi passaggi:

  1. Clonare il repository:

    git clone https://github.com/hortonworks-spark/shc
    
  2. Passare a branch-2.4:

    git checkout branch-2.4
    
  3. Compilazione dal ramo (crea un file con estensione jar):

    mvn clean package -DskipTests
    
  4. Eseguire il comando seguente (assicurarsi di modificare il nome con estensione jar corrispondente al file con estensione jar creato):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. Mantenere aperta l'istanza della shell Spark e continuare con la sezione successiva.

Definire un catalogo e una query

In questo passaggio, definire un oggetto catalogo corrispondente allo schema da Apache Spark ad Apache HBase.

  1. Nella shell Spark aperta immettere le istruzioni seguenti import :

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. Immettere il comando seguente per definire un catalogo per la tabella Contatti creata in HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    Il codice:

    1. Definisce uno schema del catalogo per la tabella HBase denominata Contacts.
    2. Identifica la chiave di riga come keye esegue il mapping dei nomi di colonna usati in Spark alla famiglia di colonne, al nome della colonna e al tipo di colonna usato in HBase.
    3. Definisce la chiave di riga in dettaglio come colonna denominata (rowkey), con una famiglia cf di colonne specifica di rowkey.
  3. Immettere il comando per definire un metodo che fornisce un dataframe intorno alla Contacts tabella in HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. Creare un'istanza del DataFrame:

    val df = withCatalog(catalog)
    
  5. Recuperare il DataFrame:

    df.show()
    

    Dovrebbero essere visualizzate due righe di dati:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. Registrare una tabella temporanea in modo che sia possibile eseguire query nella tabella HBase tramite Spark SQL:

    df.createTempView("contacts")
    
  7. Eseguire una query SQL sulla tabella contacts:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    Dovrebbero essere visualizzati risultati simili ai seguenti:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

Inserire nuovi dati

  1. Per inserire un nuovo record di contatto, definire una classe ContactRecord:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. Creare un'istanza di ContactRecord e inserirlo in una matrice:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. Salvare la matrice dei nuovi dati in HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. Esaminare i risultati:

    df.show()
    

    Verrà visualizzato un output simile al seguente:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. Chiudere la shell spark immettendo il comando seguente:

    :q
    

Passaggi successivi