API de Hive Warehouse Connector 2.0 en Azure HDInsight

En este artículo se enumeran todas las API que admite Hive Warehouse Connector 2.0. Todos los ejemplos que se muestran explican cómo ejecutar usando un shell de Spark y una sesión de Hive Warehouse Connector.

Creación de una sesión de Hive Warehouse Connector:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Requisito previo

Complete los pasos de configuración de Hive Warehouse Connector.

API admitidas

  • Establecimiento de la base de datos

    hive.setDatabase("<database-name>")
    
  • Enumeración de todas las bases de datos

    hive.showDatabases()
    
  • Enumeración de todas las tablas de la base de datos actual

    hive.showTables()
    
  • Descripción de una tabla

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Quitar una base de datos

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Eliminación de una tabla de la base de datos actual

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Crear una base de datos

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Creación de una tabla en la base de datos actual

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    El generador para crear tablas solo admite las siguientes operaciones:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Nota:

    Esta API crea una tabla con formato ORC en la ubicación predeterminada. Para conocer otras características u opciones o para crear una tabla usando consultas de Hive, use la API executeUpdate.

  • Lectura de una tabla

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Ejecución de comandos DDL en HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Ejecución de una consulta de Hive y carga del resultado en el conjunto de datos

    • Ejecución de una consulta a través de demonios LLAP (opción recomendada)

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Ejecución de una consulta a través de HiveServer2 mediante JDBC

      Establecimiento de spark.datasource.hive.warehouse.smartExecution en false en configuraciones de Spark antes de iniciar una sesión en la plataforma para usar esta API

      hive.execute("<hive-query>")
      
  • Cierre de una sesión de Hive Warehouse Connector

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Ejecución de una consulta de combinación de Hive

    Esta API crea una consulta de combinación de Hive con el formato

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    El generador admite las siguientes operaciones:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Escritura de un conjunto de datos en una tabla de Hive por lotes

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName debe tener el formato <db>.<table> o <table>. Si no se proporciona ningún nombre de base de datos, la tabla buscará o creará en la base de datos actual.

    • Los tipos de SaveMode son los siguientes:

      • Append: anexa el conjunto de datos a la tabla dada.

      • Overwrite: sobrescribe los datos de la tabla dada con el conjunto de datos.

      • Ignore: omite la escritura si la tabla ya existe, y no se produce ningún error.

      • ErrorIfExists: se produce un error si ya existe la tabla.

  • Escritura de un conjunto de datos en una tabla de Hive usando HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Nota:

    Al escribir secuencias siempre se anexan datos.

  • Escritura de una secuencia de Spark en una tabla de Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Pasos siguientes