Azure HDInsight 中 Hive Warehouse Connector 支援的 Apache Spark 作業

本文說明 Hive Warehouse Connector (HWC) 支援的 Spark 型作業。 以下顯示的所有範例都會透過 Apache Spark 殼層來執行。

必要條件

完成 Hive Warehouse Connector 設定步驟。

開始使用

若要啟動 spark-shell 工作階段,請執行下列步驟:

  1. 使用 ssh 命令來連線到 Apache Spark 叢集。 編輯以下命令並將 CLUSTERNAME 取代為您叢集的名稱,然後輸入命令:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 從 ssh 工作階段執行下列命令,以記下 hive-warehouse-connector-assembly 版本:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. 使用上面所識別的 hive-warehouse-connector-assembly 版本來編輯下列程式碼。 然後執行命令以啟動 Spark 殼層:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. 啟動 Spark 殼層之後,您可以使用下列命令來啟動 Hive Warehouse Connector 執行個體:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

使用 Hive 查詢建立 Spark 資料框架

使用 HWC 程式庫的所有查詢結果都會以資料框架的形式傳回。 下列範例示範如何建立基本 Hive 查詢。

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

查詢結果是 Spark 資料框架,可以與像是 MLIB 和 SparkSQL 的 Spark 程式庫搭配使用。

將 Spark 資料框架寫出至 Hive 資料表

Spark 原生不支援寫入 Hive 的受控 ACID 資料表。 不過,使用 HWC 時,您可以將任何資料框架寫出至 Hive 資料表。 您可以在下列範例中看到這項功能運作:

  1. 建立名為 sampletable_colorado 的資料表,並使用下列命令來指定其資料行:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. 篩選資料表 hivesampletable,其中資料行 state 等於 Colorado。 此 Hive 查詢會使用 write 函式,並傳回儲存在 Hive 資料表 sampletable_colorado 中的 Spark DataFrame 與結果。

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. 使用下列命令來檢視結果:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

結構化串流寫入

使用 Hive Warehouse Connector,您也可以使用 Spark 串流將資料寫入至 Hive 資料表。

重要

在已啟用 ESP 的 Spark 4.0 叢集中,不支援結構化串流寫入。

請遵循下列步驟,透過下列方式,將 localhost 連接埠 9999上的資料從 Spark 串流內嵌至 Hive 資料表: Hive Warehouse Connector。

  1. 從您的開啟 Spark 殼層,使用下列命令開始 Spark 串流:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. 執行下列步驟,為您建立的 Spark 串流產生資料:

    1. 在相同的 Spark 叢集中開啟第二個 SSH 工作階段。
    2. 在命令提示字元中,輸入 nc -lk 9999。 此命令會使用 netcat 公用程式,從命令列將資料傳送至指定的連接埠。
  3. 返回第一個 SSH 工作階段,並建立新的 Hive 資料表來保存串流資料。 在 spark-shell 中,輸入下列命令:

    hive.createTable("stream_table").column("value","string").create()
    
  4. 然後使用下列命令,將串流資料寫入至新建立的資料表:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    重要

    因為 Apache Spark 中有已知問題,所以 metastoreUridatabase 選項目前必須手動設定。 如需有關此問題的詳細資訊,請參閱 SPARK-25460

  5. 返回第二個 SSH 工作階段,然後輸入下列值:

    foo
    HiveSpark
    bar
    
  6. 返回第一個 SSH 工作階段,並記下簡短的活動。 請使用下列命令以檢視資料:

    hive.table("stream_table").show()
    

使用 Ctrl + C,在第二個 SSH 工作階段上停止 netcat。 使用 :q,在第一個 SSH 工作階段上結束 spark-shell。

下一步