Bagikan melalui


Dialek Apache Hive di kluster Apache Flink® pada HDInsight di AKS

Catatan

Kami akan menghentikan Azure HDInsight di AKS pada 31 Januari 2025. Sebelum 31 Januari 2025, Anda harus memigrasikan beban kerja anda ke Microsoft Fabric atau produk Azure yang setara untuk menghindari penghentian tiba-tiba beban kerja Anda. Kluster yang tersisa pada langganan Anda akan dihentikan dan dihapus dari host.

Hanya dukungan dasar yang akan tersedia hingga tanggal penghentian.

Penting

Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.

Dalam artikel ini, pelajari cara menggunakan dialek Apache Flink di AKS.

Pendahuluan

Pengguna tidak dapat mengubah dialek default flink menjadi dialek apache hive untuk penggunaan mereka pada HDInsight pada kluster AKS. Semua operasi SQL gagal setelah diubah menjadi dialek apache hive dengan kesalahan berikut.


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

Alasan masalah ini muncul karena Apache Hive Jira terbuka. Saat ini, Apache Hive mengasumsikan bahwa pemuat kelas sistem adalah instans URLClassLoader. Dalam Java 11, asumsi ini tidak terjadi.

  • Jalankan langkah-langkah berikut di webssh:

    1. Hapus flink-sql-connector-hive*jar yang ada di lokasi lib
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. Unduh jar berikut dalam pod dan tambahkan di webssh bawah /opt/flink-webssh/lib wget https://aka.ms/hdiflinkhivejdk11jar. (Jar sarang di atas memiliki perbaikan https://issues.apache.org/jira/browse/HIVE-27508)
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    
    1. Tambahkan kunci berikut dalam manajemen konfigurasi di flink bawah bagian core-site.xml:
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • Berikut adalah gambaran umum kueri apache hive-dialect

    • Mengeksekusi dialek Apache Hive di Flink tanpa pemartisian
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    Data ditulis dalam kontainer yang sama yang dikonfigurasi di direktori apache hive/gudang.

    Cuplikan layar memperlihatkan tabel kontainer 1.

    • Mengeksekusi dialek Apache Hive di Flink dengan partisi
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

Cuplikan layar memperlihatkan tabel kontainer 2.

Cuplikan layar memperlihatkan tabel kontainer 3.

Referensi