Teilen über


Verwenden des Hive-Katalogs mit Apache Flink® in HDInsight on AKS

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

In diesem Beispiel wird der Metastore von Hive als persistenter Katalog mit dem Hive-Katalog von Apache Flink verwendet. Diese Funktion dient hier dazu, Kafka- und MySQL-Tabellenmetadaten sitzungsübergreifend in Flink zu speichern. Flink verwendet eine Kafka-Tabelle, die im Hive-Katalog als Quelle registriert ist, führt einige Lookupvorgänge aus und platziert das Ergebnis in einer MySQL-Datenbank, die hier als Senke fungiert.

Voraussetzungen

Flink bietet eine zweifache Hive-Integration.

  • Der erste Schritt besteht darin, Hive Metastore (HMS) als persistenten Katalog mit HiveCatalog von Flink zu verwenden, um Flink-spezifische Metadaten sitzungsübergreifend zu speichern.
    • Beispielsweise können Benutzer*innen ihre Kafka- oder ElasticSearch-Tabellen mithilfe von HiveCatalog in Hive Metastore speichern und sie später in SQL-Abfragen wiederverwenden.
  • Der zweite Schritt besteht darin, Flink als alternative Engine zum Lesen und Schreiben von Hive-Tabellen bereitzustellen.
  • Der Hive-Katalog ist standardmäßig mit bereits vorhandenen Hive-Installationen kompatibel. Sie müssen weder Ihre vorhandene Hive Metastore-Instanz noch die Datenplatzierung oder die Partitionierung Ihrer Tabellen ändern.

Weitere Informationen finden Sie unter Apache Hive.

Umgebungsvorbereitung

Hier wird ein Apache Flink-Cluster mit HMS im Azure-Portal erstellt. Eine ausführliche Anleitung zum Erstellen eines Flink-Clusters finden Sie hier.

Screenshot des Erstellens eines Flink-Clusters.

Überprüfen Sie nach der Clustererstellung auf der AKS-Seite, ob HMS ausgeführt wird.

Screenshot des Überprüfens des HMS-Status im Flink-Cluster.

Vorbereiten des Kafka-Themas für Transaktionsdaten von Benutzerbestellungen in HDInsight

Laden Sie die JAR-Datei des Kafka-Clients mithilfe des folgenden Befehls herunter:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Entpacken Sie die TAR-Datei wie folgt:

tar -xvf kafka_2.12-3.2.0.tgz

Generieren Sie die Meldungen für das Kafka-Thema.

Screenshot des Erstellen von Nachrichten zum Kafka-Thema.

Weitere Befehle:

Hinweis

Ersetzen Sie „bootstrap-server“ durch den Hostnamen oder die IP-Adresse Ihrer eigenen Kafka-Broker.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Vorbereiten von Masterdaten für Benutzerbestellungen in MySQL in Azure

Testen der Datenbank:

Screenshot des Testens der Datenbank in Kafka.

Screenshot des Ausführens von Cloud Shell im Portal.

Bereiten Sie die Bestelltabelle vor:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Verwenden der für SSH-Downloads erforderlichen JAR-Dateien für Kafka-Connector und MySQL-Datenbank

Hinweis

Laden Sie die richtige JAR-Version für die HDInsight-Kafka-Version und die MySQL-Version herunter.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Verschieben der JAR-Datei für den Planer

Verschieben Sie die JAR-Datei „flink-table-planner_2.12-1.17.0-....jar“ im Verzeichnis „/opt“ des WebSSH-Pods nach „/lib“, und verschieben Sie die JAR-Datei „flink-table-planner-loader1.17.0-....jar“ von „/lib“ nach „/opt/flink-webssh/opt/“. Ausführlichere Informationen finden Sie hier. Gehen Sie wie folgt vor, um die JAR-Datei für den Planer zu verschieben:

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Hinweis

Die Verschiebung einer weiteren JAR-Datei für den Planer ist nur erforderlich, wenn Sie einen Hive-Dialekt oder einen HiveServer2-Endpunkt verwenden. Dies ist jedoch das empfohlene Setup für die Hive-Integration.

Überprüfen

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Hinweis

Da wir bereits einen Flink-Cluster mit Hive Metastore verwenden, sind keine weiteren Konfigurationsschritte erforderlich.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Screenshot des Erstellens einer Kafka-Tabelle.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Screenshot des Erstellens einer MySQL-Tabelle.

Screenshot der Tabellenausgabe.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Screenshot des Verschiebens einer Benutzertransaktion in eine Senke.

Screenshot der Flink-Benutzeroberfläche.

Überprüfen, ob die Daten zu Benutzerbestelltransaktionen in Kafka der Masterbestellungstabelle in MySQL in Azure Cloud Shell hinzugefügt wurden

Screenshot des Überprüfens einer Benutzertransaktion.

Erstellen drei weiterer Benutzerbestellungen in Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Screenshot des Überprüfens von Kafka-Tabellendaten.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Screenshot des Überprüfens der Tabelle „orders“.

Überprüfen, ob der Datensatz product_id = 104 in der Bestellungstabelle in MySQL in Azure Cloud Shell hinzugefügt wurde

Screenshot der Datensätze, die der Tabelle „orders“ hinzugefügt wurden.

Verweis