API Hive Warehouse Connector dans Azure HDInsight
Cet article liste toutes les API prises en charge par Hive Warehouse Connector. Tous les exemples présentés ci-dessous sont exécutés à l’aide de l’interpréteur de commandes Spark et d’une session Hive Warehouse Connector.
Comment créer une session Hive Warehouse Connector :
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Configuration requise
Suivez les étapes de Configuration de Hive Warehouse Connector.
API prises en charge
Définir la base de données :
hive.setDatabase("<database-name>")
Lister toutes les bases de données :
hive.showDatabases()
Lister toutes les tables de la base de données active
hive.showTables()
Décrire une table
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Déposer une base de données
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Supprimer une table de la base de données active
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Création d'une base de données
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Créer une table dans la base de données active
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Le générateur pour la création de table prend en charge uniquement les opérations ci-dessous :
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Notes
Cette API crée une table au format ORC à l’emplacement par défaut. Pour d’autres fonctionnalités/options ou pour créer une table à l’aide de requêtes Hive, utilisez l’API
executeUpdate
.Lire une table
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Exécuter des commandes DDL sur HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw excpetion in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Exécuter une requête Hive et charger le résultat dans un jeu de données
Exécution d’une requête par le biais de démons LLAP. [Recommandé]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Exécution d’une requête par le biais de HiveServer2 via JDBC.
Définir
spark.datasource.hive.warehouse.smartExecution
surfalse
dans les configurations Spark avant de démarrer la session Spark pour utiliser cette APIhive.execute("<hive-query>")
Fermer une session Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Exécuter une requête de fusion Hive
Cette API crée une requête de fusion Hive au format ci-dessous
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Le générateur prend en charge les opérations suivantes :
mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Écrire un jeu de données dans la table Hive par lot
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName être au format
<db>.<table>
ou<table>
. Si aucun nom de base de données n’est fourni, la table est explorée/créée dans la base de données activeLes types SaveMode (Mode d’enregistrement) sont les suivants :
Append : ajoute le jeu de données à la table spécifiée
Overwrite : remplace les données de la table spécifiée par le jeu de données
Ignore : ignore l’écriture si la table existe déjà ; aucune erreur n’est générée
ErrorIfExists : génère une erreur si la table existe déjà
Écrire un jeu de données dans une table Hive à l’aide de HiveStreaming
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Notes
Les écritures en flux ajoutent toujours des données.
Écriture d’un flux Spark dans une table Hive
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()