Azure HDInsight 中的 Hive Warehouse Connector API

本文列出 Hive Warehouse Connector 支援的所有 API。 下列所有範例都是使用 Spark Shell 和 Hive Warehouse Connector 工作階段執行的。

如何建立 Hive Warehouse Connector 工作階段:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

必要條件

完成 Hive Warehouse Connector 設定步驟。

支援的 API

  • 設定資料庫:

    hive.setDatabase("<database-name>")
    
  • 列出所有資料庫:

    hive.showDatabases()
    
  • 列出目前資料庫中的所有資料表

    hive.showTables()
    
  • 描述資料表

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • 卸除資料庫

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • 卸除目前資料庫中的資料表

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • 建立資料庫

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • 在目前的資料庫中建立資料表

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    create-table 的產生器僅支援下列作業:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    注意

    此 API 會在預設位置建立 ORC 格式化資料表。 若要使用其他功能/選項,或使用 Hive 查詢建立資料表,請使用 executeUpdate API。

  • 讀取資料表

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • 在 HiveServer2 上執行 DDL 命令

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • 執行 Hive 查詢並在資料集中載入結果

    • 透過 LLAP 精靈執行查詢。 [建議]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • 使用 HiveServer2 透過 JDBC 執行查詢。

      先在 Spark 設定中將 spark.datasource.hive.warehouse.smartExecution 設為 false,再啟動 Spark 工作階段以使用此 API

      hive.execute("<hive-query>")
      
  • 關閉 Hive Warehouse Connector 工作階段

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • 執行 Hive 合併查詢

    此 API 會建立下列格式的 Hive 合併查詢

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    產生器支援下列作業:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • 以批次方式將資料集寫入至 Hive 資料表

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName 的格式應為 <db>.<table><table>。 若未提供資料庫名稱,則會在目前的資料庫中搜尋/建立資料表

    • SaveMode 類型為:

      • 附加:將資料集附加至指定的資料表

      • 覆寫:使用資料集覆寫指定資料表中的資料

      • 忽略:如果資料表已存在則略過寫入,不會擲回錯誤

      • ErrorIfExists:如果資料表已存在則擲回錯誤

  • 使用 HiveStreaming 將資料集寫入至 Hive 資料表

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    注意

    資料流寫入一律會附加資料。

  • 將 Spark 資料流寫入至 Hive 資料表

    stream.writeStream
        .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()