Usare metastore Hive con l'API DataStream Apache Flink®
Nota
Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.
Solo il supporto di base sarà disponibile fino alla data di ritiro.
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.
Nel corso degli anni, Hive Metastore si è evoluto in un centro di metadati de facto nell'ecosistema Hadoop. Molte aziende hanno un'istanza separata del servizio Metastore Hive nei propri ambienti di produzione per gestire tutti i metadati (metadati Hive o non Hive). Per gli utenti con distribuzioni Hive e Flink, HiveCatalog consente loro di usare Metastore Hive per gestire i metadati di Flink.
Versioni Hive supportate per cluster Apache Flink in HDInsight su AKS
Versione di Hive supportata:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
Se si sta creando un programma personalizzato, sono necessarie le dipendenze seguenti nel file mvn. Non è consigliabile includere queste dipendenze nel file JAR risultante. È consigliabile aggiungere dipendenze in fase di esecuzione.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Connettersi a Hive
Questo esempio illustra vari frammenti di codice di connessione a Hive, usando Apache Flink in HDInsight su AKS, è necessario usare /opt/hive-conf
come directory di configurazione hive per connettersi al metastore Hive
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
tableEnv.from("MyTable").execute().print();
// 4. run stream
env.execute("Hive Demo on Flink");
}
}
Nel pod Webssh spostare il file JAR di Planner
Spostare il flink-table-planner-loader-1.17.0-*.*.*.jar
jar che si trova nel /opt to /lib
del pod Webssh e spostare il file JAR flink-table-planner-loader-1.17.0-*.*.*.jar
da lib
. Per altri dettagli, vedere il problema. Per spostare il file JAR di pianificazione, seguire questa procedura.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
Nota
Lo spostamento di un file JAR aggiuntivo di pianificazione è necessario solo quando si usa il dialetto Hive o l'endpoint HiveServer2. Tuttavia, si tratta della configurazione consigliata per l'integrazione di Hive.
Per altre informazioni, vedere Come usare Hive Catalog con Apache Flink® in HDInsight su AKS
Creare il pacchetto JAR, caricarlo in Webssh ed eseguirlo
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
2 rows in set
Controllare il processo in esecuzione nell'interfaccia utente Flink
Controllare la tabella nell'interfaccia utente Webssh tramite sql-client.sh
user@sshnode-0 [ ~ ]$ bin/sql-client.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
Flink SQL> CREATE CATALOG myhive WITH (
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
| mytable |
| myview |
+------------+
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
Received a total of 2 rows
Riferimenti
- Lettura e scrittura Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).