使用 Apache Spark 來讀取和寫入 Apache HBase 資料
Apache HBase 通常會使用其低階 API (scan、get、put) 或者使用 Apache Phoenix 以 SQL 語法來查詢。 Apache 也提供 Apache Spark HBase Connector。 Connector 是查詢及修改 HBase 所儲存資料的方便且高效率替代方式。
必要條件
在相同虛擬網路中部署兩個不同的 HDInsight 叢集。 一個 HBase,以及一個至少已安裝 Spark 2.1 (HDInsight 3.6) 的 Spark。 如需詳細資訊,請參閱使用 Azure 入口網站在 HDInsight 中建立以 Linux 為基礎的叢集。
您叢集主要儲存體的 URI 配置。 此配置將是適用於 Azure Blob 儲存體的 wasb://、適用於 Azure Data Lake Storage Gen2 的
abfs://
,或適用於 Azure Data Lake Storage Gen1 的 adl://。 如果已對 Blob 儲存體啟用安全傳輸,URI 會是wasbs://
。 另請參閱安全傳輸。
整體程序
啟用您的 Spark 叢集以查詢 HBase 叢集的高階程序如下:
- 在 HBase 中準備一些範例資料。
- 從您的 HBase 叢集設定資料夾 (/etc/hbase/conf) 取得 hbase-site.xml 檔案,並將 hbase-site.xml 的副本放置在 Spark 2 設定資料夾 (/etc/spark2/conf)。 (選用:使用 HDInsight 小組提供的指令碼將此程序自動化)
- 根據
packages
選項中的 Maven 座標,執行參照 Spark HBase Connector 的spark-shell
。 - 定義目錄,該目錄將結構描述從 Spark 對應至 HBase。
- 使用 RDD 或 DataFrame API 與 HBase 資料進行互動。
在 Apache HBase 中準備範例資料
在此步驟中,您會在 Apache HBase 中建立並填入資料表,然後您可以使用 Spark 查詢。
使用
ssh
命令來連線至您的 HBase 叢集。 編輯命令並將HBASECLUSTER
取代為 HBase 叢集的名稱,然後輸入命令:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
使用
hbase shell
命令來啟動 HBase 互動式殼層。 在您的 SSH 連線中輸入下列命令:hbase shell
使用
create
命令來建立含兩個資料行系列的 HBase 資料表。 輸入下列命令:create 'Contacts', 'Personal', 'Office'
使用
put
命令來將值插入特定資料表中之指定資料列的指定資料行。 輸入下列命令:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
使用
exit
命令來停止 HBase 互動式殼層。 輸入下列命令:exit
執行指令碼以設定叢集之間的連線
若要設定叢集之間的通訊,請遵循下列步驟在叢集上執行兩個指令碼。 這些指令碼會將<手動設定通訊>一節中所述的檔案複製程序自動化。
- 您從 HBase 叢集執行的指令碼會將
hbase-site.xml
和 HBase IP 對應資訊上傳至連結至 Spark 叢集的預設儲存體。 - 您從 Spark 叢集執行的指令碼會設定兩個 cron 作業,以定期執行兩個協助程式指令碼:
- HBase cron 作業 – 從 Spark 預設儲存體帳戶下載新的
hbase-site.xml
檔案和 HBase IP 對應到本機節點 - Spark cron 作業 – 檢查是否發生 Spark 縮放,以及叢集是否安全。 如果是,請編輯
/etc/hosts
以包含儲存在本機的 HBase IP 對應
- HBase cron 作業 – 從 Spark 預設儲存體帳戶下載新的
注意:繼續之前,請確定您已將 Spark 叢集的儲存體帳戶新增至 HBase 叢集做為次要儲存體帳戶。 請確定您的指令碼順序如下。
在 HBase 叢集上使用指令碼動作,以下列考量來套用變更:
屬性 值 Bash 指令碼 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
節點類型 區域 參數 -s SECONDARYS_STORAGE_URL
已保存 是 SECONDARYS_STORAGE_URL
是 Spark 端預設儲存體的 URL。 參數範例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
在 Spark 叢集上使用指令碼動作,以下列考量來套用變更:
屬性 值 Bash 指令碼 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
節點類型 Head、Worker、Zookeeper 參數 -s "SPARK-CRON-SCHEDULE"
(選擇性)-h "HBASE-CRON-SCHEDULE"
(選擇性)已保存 是 - 您可以指定此叢集自動檢查更新的頻率。 預設值:-s “*/1 * * * *” -h 0 (在此範例中,Spark cron 會每分鐘執行一次,而 HBase cron 不會執行)
- 由於預設不會設定 HBase cron,因此您必須在執行 HBase 叢集的調整時,重新執行此指令碼。 如果您的 HBase 叢集經常調整,您可以選擇自動設定 HBase cron 作業。 例如:
-h "*/30 * * * *"
會設定指令碼每隔 30 分鐘執行檢查一次。 這會定期執行 HBase cron 排程,以自動將常見儲存體帳戶上的新 HBase 資訊下載至本機節點。
手動設定通訊 (選擇性,如果上述步驟中的指令碼失敗)
注意:每當其中一個叢集進行調整活動時,都必須執行這些步驟。
將 hbase-site.xml 從本機儲存體複製到 Spark 叢集預設儲存體的根目錄。 編輯命令以反映您的設定。 然後,從 HBase 叢集的開啟 SSH 工作階段輸入命令:
語法值 新值 URI 配置 修改以反映您的儲存體。 語法適用於已啟用安全傳輸的 Blob 儲存體。 SPARK_STORAGE_CONTAINER
取代為用於 Spark 叢集的預設儲存體容器名稱。 SPARK_STORAGE_ACCOUNT
取代為用於 Spark 叢集的預設儲存體帳戶名稱。 hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
然後結束與 HBase 叢集的 SSH 連線。
exit
使用 SSH 連線到 Spark 叢集的前端節點。 編輯以下命令並將
SPARKCLUSTER
取代為 Spark 叢集的名稱,然後輸入命令:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
輸入命令,將
hbase-site.xml
從您的 Spark 叢集預設儲存體複製到叢集本機儲存體上的 Spark 2 設定資料夾:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
執行參照 Spark HBase Connector 的 Spark Shell
完成上述步驟之後,您應該能夠執行 Spark 殼層,並參考適當的 Spark HBase Connector 版本。
例如,下表列出 HDInsight 小組目前使用的兩個版本和對應的命令。 如果 HBase 和 Spark 的版本與資料表中所指示的版本相同,您可以針對叢集使用相同的版本。
在 Spark 叢集的開啟 SSH 工作階段中,輸入下列命令以啟動 Spark 殼層:
Spark 版本 HDI HBase 版本 SHC 版本 Command 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
讓此 Spark 殼層執行個體保持開啟,並繼續定義目錄和查詢。 如果您找不到對應至 SHC Core 存放庫中版本的 jar,請繼續閱讀。
針對 Spark 和 HBase 版本的後續組合,這些成品不再於上述存放庫上發佈。 您可以直接從 spark-hbase-connector GitHub 分支建置 jar。 例如,如果您使用 Spark 2.4 和 HBase 2.1 執行,請完成下列步驟:
複製存放庫:
git clone https://github.com/hortonworks-spark/shc
移至 branch-2.4:
git checkout branch-2.4
從分支建置 (會建立 .jar 檔案):
mvn clean package -DskipTests
執行下列命令 (請務必變更對應至所建置 .jar 檔案的 .jar 名稱):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
讓此 Spark 殼層執行個體保持開啟,並且繼續進行下一節。
定義目錄和查詢
在這個步驟中,您會定義目錄物件,該目錄物件將結構描述從 Apache Spark 對應至 Apache HBase。
在開啟的 Spark Shell 中,輸入下列
import
陳述式:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
輸入下列命令,以定義您在 HBase 中建立的 Contacts 資料表目錄:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
程式碼:
- 針對名為
Contacts
的 HBase 資料表定義目錄結構描述。 - 將 rowkey 識別為
key
,並且將 Spark 中使用的資料行名稱對應至 HBase 中使用的資料行系列、資料行名稱以及資料行類型。 - 將 rowkey 詳細定義為具名資料行 (
rowkey
),其具有rowkey
的特定資料行系列cf
。
- 針對名為
輸入命令以定義方法,該方法提供 HBase 中
Contacts
資料表周圍的 DataFrame:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
建立 DataFrame 的執行個體:
val df = withCatalog(catalog)
查詢 DataFrame:
df.show()
您應該會看到兩個資料列的資料:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
註冊暫存資料表,以便使用 Spark SQL 查詢 HBase 資料表:
df.createTempView("contacts")
根據
contacts
資料表發出 SQL 查詢:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
您應該會看到如下的結果:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
插入新資料
若要插入新的連絡人記錄,請定義
ContactRecord
類別:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
建立
ContactRecord
的執行個體並將其放置在陣列中:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
將新資料的陣列儲存至 HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
檢查結果︰
df.show()
您應該會看到如下的輸出:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
輸入下列命令以關閉 Spark 殼層:
:q