واجهات برمجة تطبيقات موصل مستودع Hive في Azure HDInsight

تسرد هذه المقالة جميع واجهات برمجة التطبيقات التي يدعمها موصل مستودع Hive. يتم تشغيل جميع الأمثلة الموضحة أدناه باستخدام spark-shell موصل مستودع hive.

كيفية إنشاء جلسة موصل مستودع Hive:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

المتطلب الأساسي

أكمل خطوات إعداد موصل مستودع Hive .

واجهات برمجة التطبيقات المدعومة

  • قم بتعيين قاعدة البيانات:

    hive.setDatabase("<database-name>")
    
  • قائمة بجميع قواعد البيانات:

    hive.showDatabases()
    
  • سرد كافة الجداول في قاعدة البيانات الحالية

    hive.showTables()
    
  • وصف الجدول

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • قم بإسقاط قاعدة بيانات

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • قم بإسقاط جدول في قاعدة البيانات الحالية

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • إنشاء قاعدة بيانات

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • إنشاء جدول في قاعدة البيانات الحالية

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    يدعم Builder for create-table العمليات التالية فقط:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    ملاحظة

    يقوم API هذا بإنشاء جدول بتنسيق ORC في الموقع الافتراضي. بالنسبة إلى الميزات / الخيارات الأخرى أو لإنشاء جدول باستخدام استعلامات hive، استخدم executeUpdate API.

  • قراءة جدول

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • قم بتنفيذ أوامر DDL على HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • تنفيذ استعلام Hive وتحميل النتائج في مجموعة البيانات

    • تنفيذ الاستعلام LLAP. [مرشح]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • تنفيذ الاستعلام من خلال HiveServer2 عبر JDBC.

      اضبط spark.datasource.hive.warehouse.smartExecution على false في تكوينات شرارة قبل بدء جلسة شرارة لاستخدام واجهة برمجة التطبيقات هذه

      hive.execute("<hive-query>")
      
  • إغلاق جلسة موصل مستودع Hive

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • تنفيذ استعلام دمج Hive

    يقوم API هذا بإنشاء استعلام دمج Hive بالتنسيق أدناه

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    يدعم Builder العمليات التالية:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • اكتب مجموعة بيانات إلى جدول Hive دفعة واحدة

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • يجب أن يكون TableName بالشكل <db>.<table> أو <table>. إذا لم يتم توفير اسم قاعدة البيانات، فسيتم البحث / إنشاء الجدول في قاعدة البيانات الحالية

    • أنواع SaveMode هي:

      • إلحاق: لإلحاق مجموعة البيانات بالجدول المحدد

      • الكتابة فوق: الكتابة فوق البيانات في الجدول المحدد مع مجموعة البيانات

      • تجاهل: تخطي الكتابة إذا كان الجدول موجودًا بالفعل، ولم يتم طرح أي خطأ

      • ErrorIfExists: يرمي خطأ إذا كان الجدول موجودًا بالفعل

  • اكتب مجموعة بيانات إلى جدول Hive باستخدام HiveStreaming

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    ملاحظة

    دفق يكتب دائما إلحاق البيانات.

  • كتابة spark stream إلى جدول Hive

    stream.writeStream
        .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()