Bagikan melalui


Cara menggunakan Katalog Apache Hive dengan Apache Flink® di HDInsight di AKS

Penting

Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.

Contoh ini menggunakan Metastore Apache Hive sebagai katalog persisten dengan Apache Flink's Hive Catalog. Kami menggunakan fungsionalitas ini untuk menyimpan tabel Kafka dan metadata tabel MySQL di Flink di seluruh sesi. Flink menggunakan tabel Kafka yang terdaftar di Katalog Apache Hive sebagai sumber, melakukan beberapa pencarian dan hasil sink ke database MySQL

Prasyarat

Flink menawarkan integrasi dua kali lipat dengan Apache Hive.

  • Langkah pertama adalah menggunakan Hive Metastore (HMS) sebagai katalog persisten dengan HiveCatalog Flink untuk menyimpan metadata khusus Flink di seluruh sesi.
    • Misalnya, pengguna dapat menyimpan tabel Kafka atau ElasticSearch mereka di Apache Hive Metastore dengan menggunakan HiveCatalog, dan menggunakannya kembali nanti dalam kueri SQL.
  • Yang kedua adalah menawarkan Flink sebagai mesin alternatif untuk membaca dan menulis tabel Apache Hive.
  • HiveCatalog dirancang agar "out of the box" kompatibel dengan penginstalan Apache Hive yang ada. Anda tidak perlu mengubah Apache Hive Metastore yang ada atau mengubah penempatan data atau pemartisian tabel Anda.

Untuk informasi selengkapnya, lihat Apache Hive

Persiapan lingkungan

Mari kita buat kluster Apache Flink dengan HMS pada portal Azure, Anda dapat merujuk ke instruksi terperinci tentang pembuatan kluster Flink.

Cuplikan layar memperlihatkan cara membuat kluster Flink.

Setelah pembuatan kluster, periksa HMS berjalan atau tidak di sisi AKS.

Cuplikan layar memperlihatkan cara memeriksa status HMS di kluster Flink.

Menyiapkan topik Kafka data transaksi pesanan pengguna di HDInsight

Unduh jar klien kafka menggunakan perintah berikut:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Batalkan penargetan file tar dengan

tar -xvf kafka_2.12-3.2.0.tgz

Menghasilkan pesan ke topik Kafka.

Cuplikan layar memperlihatkan cara menghasilkan pesan ke topik Kafka.

Perintah lainnya:

Catatan

Anda diharuskan mengganti bootstrap-server dengan nama host atau IP broker kafka Anda sendiri

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Menyiapkan data master pesanan pengguna di MySQL di Azure

Pengujian DB:

Cuplikan layar memperlihatkan cara menguji database di Kafka.

Cuplikan layar memperlihatkan cara menjalankan Cloud Shell di portal.

Siapkan tabel pesanan:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Menggunakan pengunduhan SSH yang diperlukan konektor Kafka dan jar MySQL Database

Catatan

Unduh jar versi yang benar sesuai dengan versi kafka HDInsight dan versi MySQL kami.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Memindahkan jar perencana

Pindahkan jar flink-table-planner_2.12-1.17.0-... . jar yang terletak di pod webssh /opt to /lib dan pindahkan jar flink-table-planner-loader1.17.0-... . jar /opt/flink-webssh/opt/ dari /lib. Lihat masalah untuk detail selengkapnya. Lakukan langkah-langkah berikut untuk memindahkan jar perencana.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Catatan

Pemindahan jar perencana tambahan hanya diperlukan saat menggunakan dialek Apache Hive atau titik akhir Apache HiveServer2. Namun, ini adalah pengaturan yang direkomendasikan untuk integrasi Apache Hive.

Validasi

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Catatan

Karena kita sudah menggunakan kluster Flink dengan Apache Hive Metastore, tidak perlu melakukan konfigurasi tambahan apa pun.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Cuplikan layar memperlihatkan cara membuat tabel Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Cuplikan layar memperlihatkan cara membuat tabel mysql.

Cuplikan layar memperlihatkan output tabel.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Cuplikan layar memperlihatkan cara melakukan sink pada transaksi pengguna.

Cuplikan layar memperlihatkan Flink UI.

Periksa apakah data pesanan transaksi pengguna di Kafka ditambahkan dalam urutan tabel master di MySQL di Azure Cloud Shell

Cuplikan layar memperlihatkan cara memeriksa transaksi pengguna.

Membuat tiga pesanan pengguna lagi di Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Cuplikan layar memperlihatkan cara memeriksa data tabel Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Cuplikan layar memperlihatkan cara memeriksa tabel pesanan.

Periksa product_id = 104 catatan ditambahkan dalam tabel pesanan di MySQL di Azure Cloud Shell

Cuplikan layar memperlihatkan rekaman yang ditambahkan ke tabel pesanan.

Referensi