共用方式為


如何在 HDInsight on AKS 上搭配 Apache Flink® 使用 Hive 目錄

注意

AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。

在淘汰日期之前,只有基本支援可用。

重要

此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。

此範例使用 Hive 的中繼存放區作為持續性目錄 (包含 Apache Flink 的 Hive 目錄)。 我們會使用這項功能,跨工作階段在 Flink 上儲存 Kafka 資料表和 MySQL 資料表中繼資料。 Flink 會使用 Hive 目錄中註冊為來源的 Kafka 資料表,執行一些查閱並將結果接收至 MySQL 資料庫

必要條件

Flink 提供與 Hive 的雙重整合。

  • 第一個步驟是使用 Hive 中繼存放區 (HMS) 作為具有 Flink 的 HiveCatalog 的持續性目錄,以便跨工作階段儲存 Flink 特定中繼資料。
    • 例如,使用者可使用 HiveCatalog 將其 Kafka 或 ElasticSearch 資料表儲存在 Hive 中繼存放區,而稍後在 SQL 查詢中予以重複使用。
  • 第二個步驟是提供 Flink 作為讀取和寫入 Hive 資料表的替代引擎。
  • HiveCatalog 的設計訴求是要與現有 Hive 安裝立即相容。 您不需要修改現有的 Hive 中繼存放區,或變更資料表的資料放置或分割。

如需詳細資訊,請參閱 Apache Hive

環境準備

若要在 Azure 入口網站上建立具有 HMS 的 Apache Flink 叢集,請參閱建立 Flink 叢集的詳細指示。

顯示如何建立 Flink 叢集的螢幕擷取畫面。

建立叢集之後,請檢查 HMS 是否在 AKS 端執行。

顯示如何在 Flink 叢集中檢查 HMS 狀態的螢幕擷取畫面。

在 HDInsight 上準備使用者訂單交易資料 Kafka 主題

使用下列命令下載 kafka 用戶端 jar:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

使用下列命令解壓縮 tar 檔案:

tar -xvf kafka_2.12-3.2.0.tgz

產生 Kafka 主題的訊息。

顯示如何將訊息產生至 Kafka 主題的螢幕擷取畫面。

其他命令:

注意

您必須將 bootstrap-server 取代為自己的 kafka 訊息代理程式主機名稱或 IP

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

在 Azure 的 MySQL 上準備使用者訂單主要資料

Testing DB:

顯示如何在 Kafka 中測試資料庫的螢幕擷取畫面。

顯示如何在入口網站上執行 Cloud Shell 的螢幕擷取畫面。

準備訂單資料表:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

使用 SSH 下載所需的 Kafka 連接器和 MySQL 資料庫 jar

注意

根據 HDInsight kafka 版本和 MySQL 版本下載正確的版本 jar。

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

移動規劃工具 jar

將位於 webssh pod 之 /opt 的 JAR flink-table-planner_2.12-1.17.0-....jar 移至 /lib,然後將 JAR flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ 從 /lib 移出。 如需詳細資料,請參閱問題。 執行下列步驟來移動規劃工具 jar。

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

注意

只有在使用 Hive 方言或 HiveServer2 端點時才需要額外的規劃工具 jar 移動。 不過,這是 Hive 整合的建議設定。

驗證

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

注意

由於我們已經搭配 Hive 中繼存放區使用 Flink 叢集,因此不需要執行任何其他設定。

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

顯示如何建立 Kafka 資料表的螢幕擷取畫面。

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

顯示如何建立 MySQL 資料表的螢幕擷取畫面。

顯示資料表輸出的螢幕擷取畫面。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

顯示如何接收使用者交易的螢幕擷取畫面。

顯示 Flink UI 的螢幕擷取畫面。

檢查 Kafka 上的使用者交易訂單資料是否在 Azure Cloud Shell 上 MySQL 的主要資料表訂單中新增

顯示如何檢查使用者交易的螢幕擷取畫面。

在 Kafka 上建立另外三個使用者訂單

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

顯示如何檢查 Kafka 資料表資料的螢幕擷取畫面。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

顯示如何檢查訂單資料表的螢幕擷取畫面。

檢查 product_id = 104 記錄是否在 Azure Cloud Shell 上 MySQL 的訂單資料表中新增

顯示新增至訂單資料表之記錄的螢幕擷取畫面。

參考