如何在 AKS 上搭配 HDInsight 上的 Apache Flink® 使用 Hive 目錄
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
此範例使用Hive的中繼存放區做為具有Apache Flink Hive目錄的持續性目錄。 我們會使用這項功能,跨會話在 Flink 上儲存 Kafka 數據表和 MySQL 數據表元數據。 Flink 使用登錄在 Hive 目錄作為來源的 Kafka 數據表,對 MySQL 資料庫執行一些查閱和接收結果
必要條件
- 使用Hive中繼存放區 3.1.2 在 AKS 上的 HDInsight 上的 Apache Flink 叢集
- HDInsight 上的 Apache Kafka 叢集
- 您必須確定網路設定已完成,如使用 Kafka 中所述;這是為了確保 AKS 和 HDInsight 叢集上的 HDInsight 位於相同的 VNet 中
- MySQL 8.0.33
Apache Flink 上的 Apache Hive
Flink 提供與 Hive 的雙折整合。
- 第一個步驟是使用Hive中繼存放區 (HMS) 作為具有 Flink 的 HiveCatalog 持續性目錄,以跨會話儲存 Flink 特定元數據。
- 例如,使用者可以使用HiveCatalog將其Kafka或ElasticSearch資料表儲存在Hive中繼存放區中,並在稍後在SQL查詢中重複使用。
- 第二個是提供 Flink 作為讀取和寫入 Hive 數據表的替代引擎。
- HiveCatalog 是設計成與現有Hive安裝相容的「現成」。 您不需要修改現有的Hive中繼存放區,或變更數據表的數據放置或分割。
如需詳細資訊,請參閱 Apache Hive
環境準備
使用 HMS 建立 Apache Flink 叢集
讓我們在 Azure 入口網站 上使用 HMS 建立 Apache Flink 叢集,您可以參考 Flink 叢集建立的詳細指示。
建立叢集之後,請檢查 HMS 是否在 AKS 端執行。
準備 HDInsight 上的使用者訂單事務數據 Kafka 主題
使用下列命令下載 kafka 用戶端 jar:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
使用 取消 tar 檔案的Tartar 檔案
tar -xvf kafka_2.12-3.2.0.tgz
產生 Kafka 主題的訊息。
其他命令:
注意
您必須將 bootstrap-server 取代為您自己的 kafka 訊息代理程式主機名或 IP
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
在 Azure 上的 MySQL 上準備用戶訂單主要數據
測試 DB:
準備訂單資料表:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
使用 SSH 下載所需的 Kafka 連接器和 My SQL 資料庫 jar
注意
根據 HDInsight kafka 版本和 MySQL 版本下載正確的版本 jar。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
移動規劃工具 jar
移動 jar flink-table-planner_2.12-1.17.0-..。jar 位於 webssh Pod 的 /opt 至 /lib,然後移出 jar flink-table-planner-loader1.17.0-..。jar /opt/flink-webssh/opt/ from /lib. 如需詳細資訊, 請參閱問題 。 執行下列步驟來移動規劃工具 jar。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
注意
只有在使用Hive方言或HiveServer2端點時,才需要額外的規劃工具 jar 移動。 不過,這是Hive整合的建議設定。
驗證
使用 bin/sql-client.sh 連線到 Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
建立Hive目錄並連線到 Flink SQL 上的 Hive 目錄
注意
由於我們已經搭配Hive中繼存放區使用 Flink 叢集,因此不需要執行任何其他設定。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
在 Apache Flink SQL 上建立 Kafka 數據表
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
在 Apache Flink SQL 上建立 MySQL 數據表
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
檢查 Flink SQL 上 Hive 目錄上註冊的數據表
將使用者交易訂單資訊接收至 Flink SQL 上 MySQL 的主要訂單數據表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
檢查 Kafka 上的使用者交易訂單數據是否以 Azure Cloud Shell 上的 MySQL 主要數據表順序新增
在 Kafka 上建立另外三個用戶訂單
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
檢查 Flink SQL 上的 Kafka 數據表數據
Flink SQL> select * from kafka_user_orders;
在 Flink SQL 上的 MySQL 上插入 product_id=104
訂單數據表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
在 Azure Cloud Shell 上的 MySQL 上,會依順序新增檢查 product_id = 104
記錄
參考
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink 和相關聯的 開放原始碼 項目名稱是 Apache Software Foundation (ASF) 的商標。
意見反映
https://aka.ms/ContentUserFeedback。
即將推出:我們會在 2024 年淘汰 GitHub 問題,並以全新的意見反應系統取代並作為內容意見反應的渠道。 如需更多資訊,請參閱:提交及檢視以下的意見反映: