Rozhraní API hive Warehouse Connectoru 2.0 ve službě Azure HDInsight
Tento článek obsahuje seznam všech rozhraní API podporovaných konektorem skladu Hive 2.0. Všechny uvedené příklady ukazují, jak spustit pomocí spark-shellu a relace konektoru Hive Warehouse Connector.
Vytvoření relace konektoru skladu Hive:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Požadavek
Dokončete kroky nastavení Hive Warehouse Connectoru .
Podporovaná rozhraní API
Nastavte databázi:
hive.setDatabase("<database-name>")
Vypsat všechny databáze:
hive.showDatabases()
Výpis všech tabulek v aktuální databázi
hive.showTables()
Popis tabulky
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Odstranění databáze
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Přetažení tabulky v aktuální databázi
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Vytvoření databáze
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Vytvoření tabulky v aktuální databázi
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Tvůrce pro create-table podporuje pouze následující operace:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Poznámka
Toto rozhraní API vytvoří tabulku ve formátu ORC ve výchozím umístění. Pokud chcete získat další funkce nebo možnosti nebo vytvořit tabulku pomocí dotazů Hive, použijte
executeUpdate
rozhraní API.Čtení z tabulky
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Spouštění příkazů DDL na HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw excpetion in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Spuštění dotazu Hive a výsledek načtení v datové sadě
Provádění dotazu prostřednictvím démonů LLAP. [Doporučeno]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Provádění dotazu prostřednictvím HiveServer2 prostřednictvím JDBC.
Před spuštěním relace Sparku nastavte
spark.datasource.hive.warehouse.smartExecution
na nafalse
, aby se toto rozhraní API používalo.hive.execute("<hive-query>")
Zavření relace konektoru skladu Hive
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Spuštění dotazu hive merge
Toto rozhraní API vytvoří slučovací dotaz Hive ve formátu .
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Tvůrce podporuje následující operace:
mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Zápis datové sady do tabulky Hive v dávce
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName by měl být ve formátu
<db>.<table>
nebo<table>
. Pokud není zadaný žádný název databáze, bude tabulka prohledána nebo vytvořena v aktuální databázi.Typy SaveMode jsou:
Připojit: Připojí datovou sadu k dané tabulce.
Přepsání: Přepíše data v dané tabulce datovou sadou.
Ignorovat: Přeskočí zápis, pokud tabulka už existuje, nedochází k žádné chybě.
ErrorIfExists: Vyvolá chybu, pokud tabulka již existuje.
Zápis datové sady do tabulky Hive pomocí HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Poznámka
Stream vždy zapisuje data.
Zápis streamu Sparku do tabulky Hive
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()
Další kroky
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro