Dialeto do Hive em clusters do Apache Flink® no Azure HDInsight no AKS

Importante

Esse recurso está atualmente na visualização. Os Termos de Uso Complementares para Versões Prévias da Microsoft Azure incluem termos legais adicionais que se aplicam aos recursos do Azure que estão em versão beta, em versão de teste ou ainda não lançados para disponibilidade geral. Para informações sobre essa visualização específica, consulte o artigo Versão prévia do HDInsight no AKS. Para obter perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre a Comunidade do Azure HDInsight

Neste artigo, saiba como usar o dialeto do Hive em clusters do Apache Flink no Azure HDInsight no AKS.

Introdução

O usuário não pode alterar o dialeto padrão flink para o dialeto do Hive para seu uso no Azure HDInsight em clusters do AKS. Todas as operações SQL falham uma vez alteradas para o dialeto hive com o erro a seguir.


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class java.net.URLClassLoader*

O motivo para esse problema surge devido a um Hive Jira aberto. Atualmente, o Hive pressupõe que o carregador de classe do sistema seja uma instância do URLClassLoader. Em Java 11, essa suposição não se aplica.

  • Execute as seguintes etapas no webssh:

    1. Remover o flink-sql-connector-hive*jar existente no local do lib

      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. Baixe o jar abaixo no pod webssh e adicione-o no /opt/flink-webssh/lib wget https://aka.ms/hdiflinkhivejdk11jar. (O jar de hive acima tem a correção https://issues.apache.org/jira/browse/HIVE-27508)

    3. mv $FLINK_HOME/opt/flink-table-planner_2.12-1.16.0-0.0.18.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.16.0-0.0.18.jar
      
    4. mv $FLINK_HOME/lib/flink-table-planner-loader-1.16.0-0.0.18.jar $FLINK_HOME/opt/flink-table-planner-loader-1.16.0-0.0.18.jar
      
    5. Adicione as seguintes chaves no gerenciamento de configuração flink na seção core-site.xml:

      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • Aqui está uma visão geral das consultas hive-dialeto

    • Executando o dialeto do Hive no Flink sem particionamento
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    Os dados são gravados no mesmo contêiner configurado no diretório hive/warehouse.

    Screenshot shows container table 1.

    • Executando o dialeto do Hive em Flink com partições
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

Screenshot shows container table 2.

Screenshot shows container table 3.

Referência