Cara menggunakan Katalog Apache Hive dengan Apache Flink® di HDInsight di AKS
Catatan
Kami akan menghentikan Azure HDInsight di AKS pada 31 Januari 2025. Sebelum 31 Januari 2025, Anda harus memigrasikan beban kerja anda ke Microsoft Fabric atau produk Azure yang setara untuk menghindari penghentian tiba-tiba beban kerja Anda. Kluster yang tersisa pada langganan Anda akan dihentikan dan dihapus dari host.
Hanya dukungan dasar yang akan tersedia hingga tanggal penghentian.
Penting
Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.
Contoh ini menggunakan Metastore Apache Hive sebagai katalog persisten dengan Apache Flink's Hive Catalog. Kami menggunakan fungsionalitas ini untuk menyimpan tabel Kafka dan metadata tabel MySQL di Flink di seluruh sesi. Flink menggunakan tabel Kafka yang terdaftar di Katalog Apache Hive sebagai sumber, melakukan beberapa pencarian dan hasil sink ke database MySQL
Prasyarat
- Kluster Apache Flink di HDInsight di AKS dengan Apache Metastore 3.1.2
- Kluster Apache Kafka di HDInsight
- Anda diharuskan untuk memastikan pengaturan jaringan selesai seperti yang dijelaskan pada Menggunakan Kafka; itu untuk memastikan HDInsight pada kluster AKS dan HDInsight berada di VNet yang sama
- MySQL 8.0.33
Apache Hive di Apache Flink
Flink menawarkan integrasi dua kali lipat dengan Apache Hive.
- Langkah pertama adalah menggunakan Hive Metastore (HMS) sebagai katalog persisten dengan HiveCatalog Flink untuk menyimpan metadata khusus Flink di seluruh sesi.
- Misalnya, pengguna dapat menyimpan tabel Kafka atau ElasticSearch mereka di Apache Hive Metastore dengan menggunakan HiveCatalog, dan menggunakannya kembali nanti dalam kueri SQL.
- Yang kedua adalah menawarkan Flink sebagai mesin alternatif untuk membaca dan menulis tabel Apache Hive.
- HiveCatalog dirancang agar "out of the box" kompatibel dengan penginstalan Apache Hive yang ada. Anda tidak perlu mengubah Apache Hive Metastore yang ada atau mengubah penempatan data atau pemartisian tabel Anda.
Untuk informasi selengkapnya, lihat Apache Hive
Persiapan lingkungan
Membuat kluster Apache Flink dengan HMS
Mari kita buat kluster Apache Flink dengan HMS pada portal Azure, Anda dapat merujuk ke instruksi terperinci tentang pembuatan kluster Flink.
Setelah pembuatan kluster, periksa HMS berjalan atau tidak di sisi AKS.
Menyiapkan topik Kafka data transaksi pesanan pengguna di HDInsight
Unduh jar klien kafka menggunakan perintah berikut:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Batalkan penargetan file tar dengan
tar -xvf kafka_2.12-3.2.0.tgz
Menghasilkan pesan ke topik Kafka.
Perintah lainnya:
Catatan
Anda diharuskan mengganti bootstrap-server dengan nama host atau IP broker kafka Anda sendiri
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Menyiapkan data master pesanan pengguna di MySQL di Azure
Pengujian DB:
Siapkan tabel pesanan:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Menggunakan pengunduhan SSH yang diperlukan konektor Kafka dan jar MySQL Database
Catatan
Unduh jar versi yang benar sesuai dengan versi kafka HDInsight dan versi MySQL kami.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Memindahkan jar perencana
Pindahkan jar flink-table-planner_2.12-1.17.0-... . jar yang terletak di pod webssh /opt to /lib dan pindahkan jar flink-table-planner-loader1.17.0-... . jar /opt/flink-webssh/opt/ dari /lib. Lihat masalah untuk detail selengkapnya. Lakukan langkah-langkah berikut untuk memindahkan jar perencana.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Catatan
Pemindahan jar perencana tambahan hanya diperlukan saat menggunakan dialek Apache Hive atau titik akhir Apache HiveServer2. Namun, ini adalah pengaturan yang direkomendasikan untuk integrasi Apache Hive.
Validasi
Gunakan bin/sql-client.sh untuk menyambungkan ke Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Membuat katalog Apache Hive dan menyambungkan ke katalog apache Hive di Flink SQL
Catatan
Karena kita sudah menggunakan kluster Flink dengan Apache Hive Metastore, tidak perlu melakukan konfigurasi tambahan apa pun.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Membuat Tabel Kafka di Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Membuat Tabel MySQL di Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Periksa tabel yang terdaftar di atas katalog Apache Hive di Flink SQL
Sink info pesanan transaksi pengguna ke dalam tabel pesanan master di MySQL di Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Periksa apakah data pesanan transaksi pengguna di Kafka ditambahkan dalam urutan tabel master di MySQL di Azure Cloud Shell
Membuat tiga pesanan pengguna lagi di Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Periksa data tabel Kafka di Flink SQL
Flink SQL> select * from kafka_user_orders;
Sisipkan product_id=104
ke dalam tabel pesanan di MySQL di Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Periksa product_id = 104
catatan ditambahkan dalam tabel pesanan di MySQL di Azure Cloud Shell
Referensi
- Apache Hive 2.1.0
- Apache, Apache Hive, Hive, Apache Flink, Flink, dan nama proyek sumber terbuka terkait adalah merek dagang dari Apache Software Foundation (ASF).