Partekatu honen bidez:


Uso del catálogo de Hive con Apache Flink® en HDInsight en AKS

Importante

Esta funcionalidad actualmente está en su versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulta la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envía una solicitud en AskHDInsight con los detalles y síguenos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.

En este ejemplo se usa Metastore de Hive como catálogo persistente con el catálogo de Hive de Apache Flink’s. Usamos esta funcionalidad para almacenar tablas Kafka y metadatos de tablas MySQL en Flink entre sesiones. Flink usa la tabla Kafka registrada en el catálogo de Hive como origen, realizar algunos resultados de búsqueda y receptor en la base de datos MySQL

Requisitos previos

Flink ofrece una integración de dos plegamientos con Hive.

  • El primer paso es usar Metastore Hive (HMS) como catálogo persistente con HiveCatalog de Flink para almacenar metadatos específicos de Flink entre sesiones.
    • Por ejemplo, los usuarios pueden almacenar sus tablas de Kafka o ElasticSearch en Metastore de Hive mediante HiveCatalog y reutilizarlas más adelante en consultas SQL.
  • El segundo es ofrecer Flink como un motor alternativo para leer y escribir tablas de Hive.
  • HiveCatalog está diseñado para ser compatible con las instalaciones existentes de Hive. No es necesario modificar el Metastore de Hive existente ni cambiar la ubicación de datos ni la creación de particiones de las tablas.

Para más información, consulte Apache Hive

Preparación del entorno

Permite crear un clúster de Apache Flink con HMS en Azure Portal, puede consultar las instrucciones detalladas en Creación de clúster Flink.

Captura de pantalla que muestra cómo crear un clúster de Flink.

Después de la creación del clúster, compruebe si HMS se está ejecutando o no por el lado de AKS.

Captura de pantalla que muestra cómo comprobar el estado de HMS en el clúster de Flink.

Preparación del tema de Kafka sobre datos de transacción de orden de usuario en HDInsight

Descargar el jar del cliente Kafka mediante el siguiente comando:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Descomprima el archivo tar con

tar -xvf kafka_2.12-3.2.0.tgz

Genere los mensajes en el tema de Kafka.

Captura de pantalla que muestra cómo generar mensajes en el tema de Kafka.

Otros comandos:

Nota:

Debe reemplazar el servidor de arranque por su propio nombre de host o IP de los agentes de Kafka

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Preparación de los datos maestros de orden de usuario en MySQL en Azure

Prueba de la base de datos:

Captura de pantalla que muestra cómo probar la base de datos en Kafka.

Captura de pantalla que muestra cómo ejecutar Cloud Shell en el portal.

Prepare la tabla de orden:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Usando SSH descarga el conector Kafka requerido y los jars de la base de datos MySQL

Nota:

Descargue la versión jar correcta según nuestra versión de HDInsight Kafka y la versión de MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

mover el archivo jar de planner

Mueva el archivo jar flink-table-planner_2.12-1.17.0-....jar ubicado en el pod de webssh /opt to /lib y salga del archivo jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ desde /lib. Consulte la incidencia para más detalles. Realice los pasos siguientes para mover el archivo jar de planner.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Nota:

Solo se necesita un movimiento jar adicional de planner al usar el dialecto de Hive o el punto de conexión de HiveServer2. Sin embargo, esta es la configuración recomendada para la integración de Hive.

Validation

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Nota:

Como ya usamos el clúster de Flink con Metastore Hive, no es necesario realizar ninguna configuración adicional.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Captura de pantalla en la que se muestra cómo crear una tabla de Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Captura de pantalla que muestra cómo crear una tabla de mysql.

Captura de pantalla que muestra la salida de la tabla.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Captura de pantalla que muestra cómo enviar a un destino la transacción de usuario.

Captura de pantalla que muestra la interfaz de usuario de Flink.

Comprobar si los datos de orden de transacción de usuario en Kafka se agregan en el orden de la tabla maestra en MySQL en Azure Cloud Shell

Captura de pantalla que muestra cómo comprobar la transacción de usuario.

Creación de otros tres órdenes de usuario en Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Captura de pantalla que muestra cómo comprobar los datos de la tabla de Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Captura de pantalla que muestra cómo comprobar la tabla de pedidos.

Comprobación product_id = 104 de registro se agrega en la tabla de pedidos en MySQL en Azure Cloud Shell

Captura de pantalla que muestra los registros agregados a la tabla de pedidos.

Referencia