Bagikan melalui


API Hive Warehouse Connector 2.0 di Azure HDInsight

Artikel ini mencantumkan semua API yang didukung oleh Hive warehouse connector 2.0. Semua contoh yang ditunjukkan adalah cara menjalankan menggunakan spark-shell dan sesi konektor gudang sarang.

Cara membuat sesi Hive Warehouse Connector:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Prasyarat

Selesaikan langkah-langkah Penyiapan Hive Warehouse Connector.

API yang Didukung

  • Mengatur database:

    hive.setDatabase("<database-name>")
    
  • Daftar semua database:

    hive.showDatabases()
    
  • Daftar semua tabel dalam database saat ini

    hive.showTables()
    
  • Menjelaskan tabel

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Letakkan Database

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Letakkan tabel di database saat ini

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Membuat database

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Buat tabel di database saat ini

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Builder untuk pembuatan tabel hanya mendukung operasi di bawah ini:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Catatan

    API ini membuat tabel yang diformat ORC di lokasi default. Untuk fitur/opsi lain atau untuk membuat tabel menggunakan kueri Hive, gunakan API executeUpdate.

  • Membaca tabel

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Jalankan perintah DDL di HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Jalankan kueri Hive dan hasil muat di Dataset

    • Mengeksekusi kueri melalui daemon LLAP. [Disarankan]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Mengeksekusi kueri melalui HiveServer2 via JDBC.

      Setel spark.datasource.hive.warehouse.smartExecution ke false dalam konfigurasi spark sebelum memulai sesi spark untuk menggunakan API ini

      hive.execute("<hive-query>")
      
  • Menutup sesi Hive Warehouse Connector

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Jalankan kueri Gabungan Hive

    API ini membuat kueri penggabungan Apache Hive dalam format

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Builder mendukung operasi berikut:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Menulis Dataset ke Tabel Hive dalam batch

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName harus dalam bentuk <db>.<table> atau <table>. Jika tidak ada nama database yang disediakan, tabel akan dicari/dibuat dalam database saat ini

    • Tipe SaveMode adalah:

      • Menambahkan: Menambahkan dataset ke tabel yang diberikan

      • Menimpa: Menimpa data dalam tabel yang diberikan dengan dataset

      • Abaikan: Lewati menulis jika tabel sudah ada, tidak ada kesalahan yang dibuang

      • ErrorIfExists: Membuang kesalahan jika tabel sudah ada

  • Tulis Dataset ke Tabel Apache Hive menggunakan HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Catatan

    Menulis stream selalu menambahkan data.

  • Menulis stream spark ke Tabel Apache Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Langkah berikutnya