API Hive Warehouse Connector dans Azure HDInsight
Cet article liste toutes les API prises en charge par Hive Warehouse Connector. Tous les exemples présentés ci-dessous sont exécutés à l’aide de l’interpréteur de commandes Spark et d’une session Hive Warehouse Connector.
Comment créer une session Hive Warehouse Connector :
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Configuration requise
Suivez les étapes de Configuration de Hive Warehouse Connector.
API prises en charge
Définir la base de données :
hive.setDatabase("<database-name>")
Lister toutes les bases de données :
hive.showDatabases()
Lister toutes les tables de la base de données active
hive.showTables()
Décrire une table
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Déposer une base de données
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Supprimer une table de la base de données active
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Création d'une base de données
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Créer une table dans la base de données active
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Le générateur pour la création de table prend en charge uniquement les opérations ci-dessous :
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Notes
Cette API crée une table au format ORC à l’emplacement par défaut. Pour d’autres fonctionnalités/options ou pour créer une table à l’aide de requêtes Hive, utilisez l’API
executeUpdate
.Lire une table
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Exécuter des commandes DDL sur HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw excpetion in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Exécuter une requête Hive et charger le résultat dans un jeu de données
Exécution d’une requête par le biais de démons LLAP. [Recommandé]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Exécution d’une requête par le biais de HiveServer2 via JDBC.
Définir
spark.datasource.hive.warehouse.smartExecution
surfalse
dans les configurations Spark avant de démarrer la session Spark pour utiliser cette APIhive.execute("<hive-query>")
Fermer une session Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Exécuter une requête de fusion Hive
Cette API crée une requête de fusion Hive au format ci-dessous
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Le générateur prend en charge les opérations suivantes :
mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Écrire un jeu de données dans la table Hive par lot
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName être au format
<db>.<table>
ou<table>
. Si aucun nom de base de données n’est fourni, la table est explorée/créée dans la base de données activeLes types SaveMode (Mode d’enregistrement) sont les suivants :
Append : ajoute le jeu de données à la table spécifiée
Overwrite : remplace les données de la table spécifiée par le jeu de données
Ignore : ignore l’écriture si la table existe déjà ; aucune erreur n’est générée
ErrorIfExists : génère une erreur si la table existe déjà
Écrire un jeu de données dans une table Hive à l’aide de HiveStreaming
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Notes
Les écritures en flux ajoutent toujours des données.
Écriture d’un flux Spark dans une table Hive
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour