APIs do Hive Warehouse Connector 2.0 no Azure HDInsight

Este artigo lista todas as APIs com suporte do Hive Warehouse Connector 2.0. Todos os exemplos mostrados são de como executar usando o spark-shell e a sessão do conector do Hive Warehouse.

Como criar a sessão do Hive Warehouse Connector:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Pré-requisito

Conclua as etapas da instalação do Hive Warehouse Connector.

APIs com suporte

  • Definir o banco de dados:

    hive.setDatabase("<database-name>")
    
  • Listar todos os bancos de dados:

    hive.showDatabases()
    
  • Listar todas as tabelas no banco de dados atual

    hive.showTables()
    
  • Descrever uma tabela

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Remover um banco de dados

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Remover uma tabela do banco de dados atual

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Criar um banco de dados

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Criar uma tabela no banco de dados atual

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    O construtor da criação de tabelas dá suporte apenas às operações abaixo:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Observação

    Essa API cria uma tabela formatada em ORC no local padrão. Para ver outros recursos/opções ou criar uma tabela com consultas do Hive, use a API executeUpdate.

  • Ler uma tabela

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Executar comandos DDL no HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Executar a consulta do Hive e carregar o resultado em um conjunto de dados

    • Como executar consultas por meio de daemons LLAP. [Recomendado]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Como executar consultas com o HiveServer2 por meio do JDBC.

      Definir spark.datasource.hive.warehouse.smartExecution como false nas configurações do Spark antes de iniciar a sessão do Spark para usar essa API

      hive.execute("<hive-query>")
      
  • Encerrar a sessão do Hive Warehouse Connector

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Executar consultas de mesclagem do Hive

    Esta API cria uma consulta de mesclagem do Hive no formato

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    O construtor dá suporte às seguintes operações:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Gravar um conjuntos de dados na tabela do Hive em lote

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • O TableName deve ter o formato <db>.<table> ou <table>. Se nenhum nome de banco de dados for fornecido, a tabela será pesquisada/criada no banco de dados atual

    • Os tipos de SaveMode são:

      • Anexar: anexa o conjuntos de dados à tabela especificada

      • Substituir: substitui os dados na tabela especificada por um conjuntos de dados

      • Ignorar: ignora a gravação caso a tabela já exista; nenhum erro é lançado

      • ErrorIfExists: lançará um erro se a tabela já existir

  • Gravar um conjunto de dados na tabela do Hive usando HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Observação

    As gravações de fluxo sempre acrescentam dados.

  • Como gravar um fluxo do Spark em uma tabela do Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Próximas etapas