如何在 HDInsight on AKS 上搭配 Apache Flink® 使用 Hive 目錄
注意
AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。
在淘汰日期之前,只有基本支援可用。
重要
此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。
此範例使用 Hive 的中繼存放區作為持續性目錄 (包含 Apache Flink 的 Hive 目錄)。 我們會使用這項功能,跨工作階段在 Flink 上儲存 Kafka 資料表和 MySQL 資料表中繼資料。 Flink 會使用 Hive 目錄中註冊為來源的 Kafka 資料表,執行一些查閱並將結果接收至 MySQL 資料庫
必要條件
- HDInsight on AKS 上採用 Hive Metastore 3.1.2 的 Apache Flink 叢集
- HDInsight 上的 Apache Kafka 叢集
- 您必須確保如使用 Kafka 所述完成網路設定;以確保 HDInsight on AKS 和 HDInsight 叢集位於相同的 VNet 中
- MySQL 8.0.33
Apache Flink 上的 Apache Hive
Flink 提供與 Hive 的雙重整合。
- 第一個步驟是使用 Hive 中繼存放區 (HMS) 作為具有 Flink 的 HiveCatalog 的持續性目錄,以便跨工作階段儲存 Flink 特定中繼資料。
- 例如,使用者可使用 HiveCatalog 將其 Kafka 或 ElasticSearch 資料表儲存在 Hive 中繼存放區,而稍後在 SQL 查詢中予以重複使用。
- 第二個步驟是提供 Flink 作為讀取和寫入 Hive 資料表的替代引擎。
- HiveCatalog 的設計訴求是要與現有 Hive 安裝立即相容。 您不需要修改現有的 Hive 中繼存放區,或變更資料表的資料放置或分割。
如需詳細資訊,請參閱 Apache Hive
環境準備
使用 HMS 建立 Apache Flink 叢集
若要在 Azure 入口網站上建立具有 HMS 的 Apache Flink 叢集,請參閱建立 Flink 叢集的詳細指示。
建立叢集之後,請檢查 HMS 是否在 AKS 端執行。
在 HDInsight 上準備使用者訂單交易資料 Kafka 主題
使用下列命令下載 kafka 用戶端 jar:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
使用下列命令解壓縮 tar 檔案:
tar -xvf kafka_2.12-3.2.0.tgz
產生 Kafka 主題的訊息。
其他命令:
注意
您必須將 bootstrap-server 取代為自己的 kafka 訊息代理程式主機名稱或 IP
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
在 Azure 的 MySQL 上準備使用者訂單主要資料
Testing DB:
準備訂單資料表:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
使用 SSH 下載所需的 Kafka 連接器和 MySQL 資料庫 jar
注意
根據 HDInsight kafka 版本和 MySQL 版本下載正確的版本 jar。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
移動規劃工具 jar
將位於 webssh pod 之 /opt 的 JAR flink-table-planner_2.12-1.17.0-....jar 移至 /lib,然後將 JAR flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ 從 /lib 移出。 如需詳細資料,請參閱問題。 執行下列步驟來移動規劃工具 jar。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
注意
只有在使用 Hive 方言或 HiveServer2 端點時才需要額外的規劃工具 jar 移動。 不過,這是 Hive 整合的建議設定。
驗證
使用 bin/sql-client.sh 連線到 Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
建立 Hive 目錄並連線到 Flink SQL 上的 Hive 目錄
注意
由於我們已經搭配 Hive 中繼存放區使用 Flink 叢集,因此不需要執行任何其他設定。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
在 Apache Flink SQL 上建立 Kafka 資料表
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
在 Apache Flink SQL 上建立 MySQL 資料表
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
檢查 Flink SQL 上 Hive 目錄中註冊的資料表
將使用者交易訂單資訊接收至 Flink SQL 上 MySQL 的主要訂單資料表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
檢查 Kafka 上的使用者交易訂單資料是否在 Azure Cloud Shell 上 MySQL 的主要資料表訂單中新增
在 Kafka 上建立另外三個使用者訂單
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
檢查 Flink SQL 上的 Kafka 資料表資料
Flink SQL> select * from kafka_user_orders;
將 product_id=104
插入 Flink SQL 上 MySQL 的訂單資料表中
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
檢查 product_id = 104
記錄是否在 Azure Cloud Shell 上 MySQL 的訂單資料表中新增
參考
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 Apache Software Foundation (ASF) 的商標。