Comment utiliser un catalogue Hive avec Apache Flink® sur HDInsight sur AKS
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base sera disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.
Cet exemple utilise le Metastore de Hive comme catalogue persistant avec le catalogue Hive d’Apache Flink. Nous utilisons cette fonctionnalité pour stocker les métadonnées des tables Kafka et MySQL sur Flink au fil des sessions. Flink utilise la table Kafka enregistrée dans le catalogue Hive comme source, effectue une recherche et envoie le résultat dans la base de données MySQL
Prérequis
- Cluster Apache Flink sur HDInsight sur AKS avec un metastore Hive 3.1.2
- Cluster Apache Kafka sur HDInsight
- Vous devez vous assurer que les paramètres réseau sont complets, comme décrit dans Utilisation de Kafka. Ceci vise à s’assurer de la présence des clusters HDInsight sur AKS et HDInsight dans le même réseau virtuel
- MySQL 8.0.33
Apache Hive sur Apache Flink
Flink propose une double intégration avec Hive.
- La première étape consiste à utiliser metastore Hive (HMS) comme catalogue persistant avec HiveCatalog de Flink pour stocker les métadonnées spécifiques à Flink au fil des sessions.
- Par exemple, les utilisateurs peuvent stocker leurs tables Kafka ou ElasticSearch dans metastore Hive à l'aide de HiveCatalog et les réutiliser ultérieurement dans des requêtes SQL.
- La seconde consiste à proposer Flink comme moteur alternatif de lecture et d’écriture de tables Hive.
- Le HiveCatalog est conçu pour être « prêt à l’emploi » compatible avec les installations Hive existantes. Vous n'avez pas besoin de modifier votre metastore Hive existant ni de modifier le placement des données ou le partitionnement de vos tables.
Pour plus d’informations, consultez Apache Hive
Préparation de l’environnement
Créer un cluster Apache Flink avec HMS
Créons un cluster Apache Flink avec HMS sur le Portail Microsoft Azure, vous pouvez vous référer aux instructions détaillées sur la création de cluster Flink.
Après la création du cluster, vérifiez que HMS est en cours d'exécution ou non du côté AKS.
Préparer les données de transaction de commande utilisateur, sujet Kafka sur HDInsight
Téléchargez le fichier jar du client Kafka à l'aide de la commande suivante :
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Décompressez le fichier tar avec
tar -xvf kafka_2.12-3.2.0.tgz
Produisez les messages sur le sujet Kafka.
Autres commandes :
Remarque
Vous devez remplacer le serveur d'amorçage par le nom d'hôte ou l'adresse IP de votre propre courtier Kafka
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Préparer les données principales des commandes utilisateur sur MySQL sur Azure
Base de données de test :
Préparez le tableau de commande :
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
L'utilisation du téléchargement SSH nécessite le connecteur Kafka et les fichiers jar de la base de données MySQL
Remarque
Téléchargez la version jar correcte selon notre version HDInsight kafka et notre version MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Déplacer le pot du planificateur
Déplacez le fichier jar flink-table-planner_2.12-1.17.0-....jar situé dans le pod webssh /opt to /lib et déplacez le fichier jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ à partir de /lib. Rapportez-vous au problème pour plus de détails. Effectuez les étapes suivantes pour déplacer le pot du planificateur.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Remarque
Un déplacement de pot de planificateur supplémentaire n'est nécessaire que lors de l'utilisation du dialecte Hive ou du point de terminaison HiveServer2.. Cependant, il s'agit de la configuration recommandée pour l'intégration de Hive.
Validation
Utilisez bin/sql-client.sh pour vous connecter à Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Créez un catalogue Hive et connectez-vous au catalogue Hive sur Flink SQL
Remarque
Comme nous utilisons déjà le cluster Flink avec metastore Hive, il n'est pas nécessaire d'effectuer de configurations supplémentaires.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Créer une table Kafka sur Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Créer une table MySQL sur Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Vérifiez les tables enregistrées dans le catalogue Hive ci-dessus sur Flink SQL
Incorporer les informations de commande de transaction utilisateur dans la table de commande principale dans MySQL sur Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Vérifiez si les données de commande des transactions utilisateur sur Kafka sont ajoutées dans l'ordre de la table principale dans MySQL sur Azure Cloud Shell
Création de trois commandes utilisateur supplémentaires sur Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Vérifiez les données de la table Kafka sur Flink SQL
Flink SQL> select * from kafka_user_orders;
Insérer product_id=104
dans la table des commandes sur MySQL sur Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
L'enregistrement de product_id = 104
contrôle est ajouté dans la table de commande sur MySQL sur Azure Cloud Shell
Référence
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).