使用 Apache Spark 來讀取和寫入 Apache HBase 資料

Apache HBase 通常會使用其低階 API (scan、get、put) 或者使用 Apache Phoenix 以 SQL 語法來查詢。 Apache 也提供 Apache Spark HBase Connector。 Connector 是查詢及修改 HBase 所儲存資料的方便且高效率替代方式。

必要條件

  • 在相同虛擬網路中部署兩個不同的 HDInsight 叢集。 一個 HBase,以及一個至少已安裝 Spark 2.1 (HDInsight 3.6) 的 Spark。 如需詳細資訊,請參閱使用 Azure 入口網站在 HDInsight 中建立以 Linux 為基礎的叢集

  • 您叢集主要儲存體的 URI 配置。 此配置將是適用於 Azure Blob 儲存體的 wasb://、適用於 Azure Data Lake Storage Gen2 的 abfs://,或適用於 Azure Data Lake Storage Gen1 的 adl://。 如果已對 Blob 儲存體啟用安全傳輸,URI 會是 wasbs://。 另請參閱安全傳輸

整體程序

啟用您的 Spark 叢集以查詢 HBase 叢集的高階程序如下:

  1. 在 HBase 中準備一些範例資料。
  2. 從您的 HBase 叢集設定資料夾 (/etc/hbase/conf) 取得 hbase-site.xml 檔案,並將 hbase-site.xml 的副本放置在 Spark 2 設定資料夾 (/etc/spark2/conf)。 (選用:使用 HDInsight 小組提供的指令碼將此程序自動化)
  3. 根據 packages 選項中的 Maven 座標,執行參照 Spark HBase Connector 的 spark-shell
  4. 定義目錄,該目錄將結構描述從 Spark 對應至 HBase。
  5. 使用 RDD 或 DataFrame API 與 HBase 資料進行互動。

在 Apache HBase 中準備範例資料

在此步驟中,您會在 Apache HBase 中建立並填入資料表,然後您可以使用 Spark 查詢。

  1. 使用 ssh 命令來連線至您的 HBase 叢集。 編輯命令並將 HBASECLUSTER 取代為 HBase 叢集的名稱,然後輸入命令:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
    
  2. 使用 hbase shell 命令來啟動 HBase 互動式殼層。 在您的 SSH 連線中輸入下列命令:

    hbase shell
    
  3. 使用 create 命令來建立含兩個資料行系列的 HBase 資料表。 輸入下列命令:

    create 'Contacts', 'Personal', 'Office'
    
  4. 使用 put 命令來將值插入特定資料表中之指定資料列的指定資料行。 輸入下列命令:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. 使用 exit 命令來停止 HBase 互動式殼層。 輸入下列命令:

    exit
    

執行指令碼以設定叢集之間的連線

若要設定叢集之間的通訊,請遵循下列步驟在叢集上執行兩個指令碼。 這些指令碼會將<手動設定通訊>一節中所述的檔案複製程序自動化。

  • 您從 HBase 叢集執行的指令碼會將 hbase-site.xml 和 HBase IP 對應資訊上傳至連結至 Spark 叢集的預設儲存體。
  • 您從 Spark 叢集執行的指令碼會設定兩個 cron 作業,以定期執行兩個協助程式指令碼:
    1. HBase cron 作業 – 從 Spark 預設儲存體帳戶下載新的 hbase-site.xml 檔案和 HBase IP 對應到本機節點
    2. Spark cron 作業 – 檢查是否發生 Spark 縮放,以及叢集是否安全。 如果是,請編輯 /etc/hosts 以包含儲存在本機的 HBase IP 對應

注意:繼續之前,請確定您已將 Spark 叢集的儲存體帳戶新增至 HBase 叢集做為次要儲存體帳戶。 請確定您的指令碼順序如下。

  1. 在 HBase 叢集上使用指令碼動作,以下列考量來套用變更:

    屬性
    Bash 指令碼 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    節點類型 區域
    參數 -s SECONDARYS_STORAGE_URL
    已保存
    • SECONDARYS_STORAGE_URL 是 Spark 端預設儲存體的 URL。 參數範例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. 在 Spark 叢集上使用指令碼動作,以下列考量來套用變更:

    屬性
    Bash 指令碼 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    節點類型 Head、Worker、Zookeeper
    參數 -s "SPARK-CRON-SCHEDULE" (選擇性) -h "HBASE-CRON-SCHEDULE" (選擇性)
    已保存
    • 您可以指定此叢集自動檢查更新的頻率。 預設值:-s “*/1 * * * *” -h 0 (在此範例中,Spark cron 會每分鐘執行一次,而 HBase cron 不會執行)
    • 由於預設不會設定 HBase cron,因此您必須在執行 HBase 叢集的調整時,重新執行此指令碼。 如果您的 HBase 叢集經常調整,您可以選擇自動設定 HBase cron 作業。 例如:-h "*/30 * * * *" 會設定指令碼每隔 30 分鐘執行檢查一次。 這會定期執行 HBase cron 排程,以自動將常見儲存體帳戶上的新 HBase 資訊下載至本機節點。

手動設定通訊 (選擇性,如果上述步驟中的指令碼失敗)

注意:每當其中一個叢集進行調整活動時,都必須執行這些步驟。

  1. 將 hbase-site.xml 從本機儲存體複製到 Spark 叢集預設儲存體的根目錄。 編輯命令以反映您的設定。 然後,從 HBase 叢集的開啟 SSH 工作階段輸入命令:

    語法值 新值
    URI 配置 修改以反映您的儲存體。 語法適用於已啟用安全傳輸的 Blob 儲存體。
    SPARK_STORAGE_CONTAINER 取代為用於 Spark 叢集的預設儲存體容器名稱。
    SPARK_STORAGE_ACCOUNT 取代為用於 Spark 叢集的預設儲存體帳戶名稱。
    hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
    
  2. 然後結束與 HBase 叢集的 SSH 連線。

    exit
    
  3. 使用 SSH 連線到 Spark 叢集的前端節點。 編輯以下命令並將 SPARKCLUSTER 取代為 Spark 叢集的名稱,然後輸入命令:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
    
  4. 輸入命令,將 hbase-site.xml 從您的 Spark 叢集預設儲存體複製到叢集本機儲存體上的 Spark 2 設定資料夾:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

執行參照 Spark HBase Connector 的 Spark Shell

完成上述步驟之後,您應該能夠執行 Spark 殼層,並參考適當的 Spark HBase Connector 版本。

例如,下表列出 HDInsight 小組目前使用的兩個版本和對應的命令。 如果 HBase 和 Spark 的版本與資料表中所指示的版本相同,您可以針對叢集使用相同的版本。

  1. 在 Spark 叢集的開啟 SSH 工作階段中,輸入下列命令以啟動 Spark 殼層:

    Spark 版本 HDI HBase 版本 SHC 版本 Command
    2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
  2. 讓此 Spark 殼層執行個體保持開啟,並繼續定義目錄和查詢。 如果您找不到對應至 SHC Core 存放庫中版本的 jar,請繼續閱讀。

針對 Spark 和 HBase 版本的後續組合,這些成品不再於上述存放庫上發佈。 您可以直接從 spark-hbase-connector GitHub 分支建置 jar。 例如,如果您使用 Spark 2.4 和 HBase 2.1 執行,請完成下列步驟:

  1. 複製存放庫:

    git clone https://github.com/hortonworks-spark/shc
    
  2. 移至 branch-2.4:

    git checkout branch-2.4
    
  3. 從分支建置 (會建立 .jar 檔案):

    mvn clean package -DskipTests
    
  4. 執行下列命令 (請務必變更對應至所建置 .jar 檔案的 .jar 名稱):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
    
  5. 讓此 Spark 殼層執行個體保持開啟,並且繼續進行下一節。

定義目錄和查詢

在這個步驟中,您會定義目錄物件,該目錄物件將結構描述從 Apache Spark 對應至 Apache HBase。

  1. 在開啟的 Spark Shell 中,輸入下列 import 陳述式:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. 輸入下列命令,以定義您在 HBase 中建立的 Contacts 資料表目錄:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    程式碼:

    1. 針對名為 Contacts 的 HBase 資料表定義目錄結構描述。
    2. 將 rowkey 識別為 key,並且將 Spark 中使用的資料行名稱對應至 HBase 中使用的資料行系列、資料行名稱以及資料行類型。
    3. 將 rowkey 詳細定義為具名資料行 (rowkey),其具有 rowkey 的特定資料行系列 cf
  3. 輸入命令以定義方法,該方法提供 HBase 中 Contacts 資料表周圍的 DataFrame:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. 建立 DataFrame 的執行個體:

    val df = withCatalog(catalog)
    
  5. 查詢 DataFrame:

    df.show()
    

    您應該會看到兩個資料列的資料:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  6. 註冊暫存資料表,以便使用 Spark SQL 查詢 HBase 資料表:

    df.createTempView("contacts")
    
  7. 根據 contacts 資料表發出 SQL 查詢:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    

    您應該會看到如下的結果:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

插入新資料

  1. 若要插入新的連絡人記錄,請定義 ContactRecord 類別:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. 建立 ContactRecord 的執行個體並將其放置在陣列中:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. 將新資料的陣列儲存至 HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. 檢查結果︰

    df.show()
    

    您應該會看到如下的輸出:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  5. 輸入下列命令以關閉 Spark 殼層:

    :q
    

下一步