Uso del catálogo de Hive con Apache Flink® en HDInsight en AKS
Nota:
Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.
Solo el soporte técnico básico estará disponible hasta la fecha de retirada.
Importante
Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envía una solicitud en AskHDInsight con los detalles y síguenos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
En este ejemplo se usa Metastore de Hive como catálogo persistente con el catálogo de Hive de Apache Flink’s. Usamos esta funcionalidad para almacenar tablas Kafka y metadatos de tablas MySQL en Flink entre sesiones. Flink usa la tabla Kafka registrada en el catálogo de Hive como origen, realizar algunos resultados de búsqueda y receptor en la base de datos MySQL
Requisitos previos
- Clúster de Apache Flink en HDInsight en AKS con Metastore de Hive 3.1.2
- Clúster de Apache Kafka en HDInsight
- Debes asegurarte de que los valores de red están completos tal y como se describe enUso de Kafka; es decir, asegúrese de que HDInsight en clústeres de AKS y HDInsight estén en la misma red virtual
- MySQL 8.0.33
Apache Hive en Apache Flink
Flink ofrece una integración de dos plegamientos con Hive.
- El primer paso es usar Metastore Hive (HMS) como catálogo persistente con HiveCatalog de Flink para almacenar metadatos específicos de Flink entre sesiones.
- Por ejemplo, los usuarios pueden almacenar sus tablas de Kafka o ElasticSearch en Metastore de Hive mediante HiveCatalog y reutilizarlas más adelante en consultas SQL.
- El segundo es ofrecer Flink como un motor alternativo para leer y escribir tablas de Hive.
- HiveCatalog está diseñado para ser compatible con las instalaciones existentes de Hive. No es necesario modificar el Metastore de Hive existente ni cambiar la ubicación de datos ni la creación de particiones de las tablas.
Para más información, consulte Apache Hive
Preparación del entorno
Creación de un clúster Apache Flink con HMS
Permite crear un clúster de Apache Flink con HMS en Azure Portal, puede consultar las instrucciones detalladas en Creación de clúster Flink.
Después de la creación del clúster, compruebe si HMS se está ejecutando o no por el lado de AKS.
Preparación del tema de Kafka sobre datos de transacción de orden de usuario en HDInsight
Descargar el jar del cliente Kafka mediante el siguiente comando:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Descomprima el archivo tar con
tar -xvf kafka_2.12-3.2.0.tgz
Genere los mensajes en el tema de Kafka.
Otros comandos:
Nota:
Debe reemplazar el servidor de arranque por su propio nombre de host o IP de los agentes de Kafka
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Preparación de los datos maestros de orden de usuario en MySQL en Azure
Prueba de la base de datos:
Prepare la tabla de orden:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Usando SSH descarga el conector Kafka requerido y los jars de la base de datos MySQL
Nota:
Descargue la versión jar correcta según nuestra versión de HDInsight Kafka y la versión de MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
mover el archivo jar de planner
Mueva el archivo jar flink-table-planner_2.12-1.17.0-....jar ubicado en el pod de webssh /opt to /lib y salga del archivo jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ desde /lib. Consulte la incidencia para más detalles. Realice los pasos siguientes para mover el archivo jar de planner.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Nota:
Solo se necesita un movimiento jar adicional de planner al usar el dialecto de Hive o el punto de conexión de HiveServer2. Sin embargo, esta es la configuración recomendada para la integración de Hive.
Validation
Uso de rango/sql-cliente.sh para conectarse a Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Creación del catálogo de Hive y conexión al catálogo de Hive en Flink SQL
Nota:
Como ya usamos el clúster de Flink con Metastore Hive, no es necesario realizar ninguna configuración adicional.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Creación tabla de Kafka en Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Creación de tabla MySQL en Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Comprobación de tablas registradas en el catálogo de Hive anterior en Flink SQL
Recibir la información de la orden de transacción del usuario en la tabla maestra de órdenes en MySQL en Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Comprobar si los datos de orden de transacción de usuario en Kafka se agregan en el orden de la tabla maestra en MySQL en Azure Cloud Shell
Creación de otros tres órdenes de usuario en Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Comprobación de datos de tabla de Kafka en Flink SQL
Flink SQL> select * from kafka_user_orders;
Insertar product_id=104
en la tabla de órdenes en MySQL en Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Comprobación product_id = 104
de registro se agrega en la tabla de pedidos en MySQL en Azure Cloud Shell
Referencia
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink, y los nombres de proyecto de código abierto asociados son marcas comerciales de Apache Software Foundation(ASF).