Dela via


Hive-dialekt i Apache Flink-kluster® i HDInsight på AKS

Kommentar

Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.

Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.

Viktigt!

Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.

I den här artikeln lär du dig hur du använder Hive-dialekt i Apache Flink-kluster i HDInsight på AKS.

Introduktion

Användaren kan inte ändra standarddialekten flink till hive-dialekt för deras användning i HDInsight i AKS-kluster. Alla SQL-åtgärder misslyckas när de har ändrats till hive-dialekt med följande fel.


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

Orsaken till det här problemet uppstår på grund av en öppen Hive Jira. För närvarande förutsätter Hive att systemklassinläsaren är en instans av URLClassLoader. I Java 11är det här antagandet inte fallet.

  • Kör följande steg i webssh:

    1. Ta bort den befintliga flink-sql-connector-hive*jar på lib-platsen
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. Ladda ned följande jar-fil i webssh podden och lägg till den under /opt/flink-webssh/lib wget https://aka.ms/hdiflinkhivejdk11jar. (Ovanstående hive jar har korrigeringen https://issues.apache.org/jira/browse/HIVE-27508)
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    
    1. Lägg till följande nycklar i konfigurationshanteringen flink under core-site.xml avsnittet:
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • Här är en översikt över hive-dialektfrågor

    • Köra Hive-dialekt i Flink utan partitionering
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    Data skrivs i samma container som konfigurerats i katalogen hive/warehouse.

    Skärmbild som visar containertabell 1.

    • Köra Hive-dialekt i Flink med partitioner
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

Skärmbild som visar containertabell 2.

Skärmbild som visar containertabell 3.

Referens