分享方式:


如何在 AKS 上搭配 HDInsight 上的 Apache Flink® 使用 Hive 目錄

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

此範例使用Hive的中繼存放區做為具有Apache Flink Hive目錄的持續性目錄。 我們會使用這項功能,跨會話在 Flink 上儲存 Kafka 數據表和 MySQL 數據表元數據。 Flink 使用登錄在 Hive 目錄作為來源的 Kafka 數據表,對 MySQL 資料庫執行一些查閱和接收結果

必要條件

Flink 提供與 Hive 的雙折整合。

  • 第一個步驟是使用Hive中繼存放區 (HMS) 作為具有 Flink 的 HiveCatalog 持續性目錄,以跨會話儲存 Flink 特定元數據。
    • 例如,使用者可以使用HiveCatalog將其Kafka或ElasticSearch資料表儲存在Hive中繼存放區中,並在稍後在SQL查詢中重複使用。
  • 第二個是提供 Flink 作為讀取和寫入 Hive 數據表的替代引擎。
  • HiveCatalog 是設計成與現有Hive安裝相容的「現成」。 您不需要修改現有的Hive中繼存放區,或變更數據表的數據放置或分割。

如需詳細資訊,請參閱 Apache Hive

環境準備

讓我們在 Azure 入口網站 上使用 HMS 建立 Apache Flink 叢集,您可以參考 Flink 叢集建立的詳細指示

顯示如何建立 Flink 叢集的螢幕快照。

建立叢集之後,請檢查 HMS 是否在 AKS 端執行。

顯示如何在 Flink 叢集中檢查 HMS 狀態的螢幕快照。

準備 HDInsight 上的使用者訂單事務數據 Kafka 主題

使用下列命令下載 kafka 用戶端 jar:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

使用 取消 tar 檔案的Tartar 檔案

tar -xvf kafka_2.12-3.2.0.tgz

產生 Kafka 主題的訊息。

顯示如何將訊息產生至 Kafka 主題的螢幕快照。

其他命令:

注意

您必須將 bootstrap-server 取代為您自己的 kafka 訊息代理程式主機名或 IP

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

在 Azure 上的 MySQL 上準備用戶訂單主要數據

測試 DB:

顯示如何在 Kafka 中測試資料庫的螢幕快照。

顯示如何在入口網站上執行 Cloud Shell 的螢幕快照。

準備訂單資料表:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

使用 SSH 下載所需的 Kafka 連接器和 My SQL 資料庫 jar

注意

根據 HDInsight kafka 版本和 MySQL 版本下載正確的版本 jar。

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

移動規劃工具 jar

移動 jar flink-table-planner_2.12-1.17.0-..jar 位於 webssh Pod 的 /opt 至 /lib,然後移出 jar flink-table-planner-loader1.17.0-..jar /opt/flink-webssh/opt/ from /lib. 如需詳細資訊, 請參閱問題 。 執行下列步驟來移動規劃工具 jar。

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

注意

只有在使用Hive方言或HiveServer2端點時,才需要額外的規劃工具 jar 移動。 不過,這是Hive整合的建議設定。

驗證

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

注意

由於我們已經搭配Hive中繼存放區使用 Flink 叢集,因此不需要執行任何其他設定。

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

顯示如何建立 Kafka 數據表的螢幕快照。

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

顯示如何建立 mysql 數據表的螢幕快照。

顯示資料表輸出的螢幕快照。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

顯示如何接收使用者交易的螢幕快照。

顯示 Flink UI 的螢幕快照。

檢查 Kafka 上的使用者交易訂單數據是否以 Azure Cloud Shell 上的 MySQL 主要數據表順序新增

顯示如何檢查使用者交易的螢幕快照。

在 Kafka 上建立另外三個用戶訂單

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

顯示如何檢查 Kafka 資料表數據的螢幕快照。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

顯示如何檢查訂單數據表的螢幕快照。

在 Azure Cloud Shell 上的 MySQL 上,會依順序新增檢查 product_id = 104 記錄

顯示新增至訂單數據表之記錄的螢幕快照。

參考