Udostępnij za pośrednictwem


How to use Hive Catalog with Apache Flink on HDInsight on AKS (Jak używać katalogu Hive z usługą Apache Flink® w usłudze HDInsight w usłudze AKS)

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

W tym przykładzie magazyn metadanych hive jest używany jako katalog trwały z katalogiem Hive platformy Apache Flink. Używamy tej funkcji do przechowywania metadanych tabeli platformy Kafka i tabeli MySQL w języku Flink między sesjami. Funkcja Flink używa tabeli platformy Kafka zarejestrowanej w katalogu Hive jako źródła, wykonaj pewne wyniki wyszukiwania i ujścia do bazy danych MySQL

Wymagania wstępne

Funkcja Flink oferuje 2-krotną integrację z programem Hive.

  • Pierwszym krokiem jest użycie magazynu metadanych Hive (HMS) jako wykazu trwałego z usługą HiveCatalog Flink do przechowywania metadanych specyficznych dla języka Flink między sesjami.
    • Na przykład użytkownicy mogą przechowywać tabele Kafka lub ElasticSearch w magazynie metadanych Hive przy użyciu programu HiveCatalog i ponownie używać ich później w zapytaniach SQL.
  • Drugim jest oferowanie Flink jako alternatywnego aparatu do odczytywania i pisania tabel Hive.
  • Serwer HiveCatalog został zaprojektowany tak, aby był "poza pudełkiem" zgodny z istniejącymi instalacjami programu Hive. Nie musisz modyfikować istniejącego magazynu metadanych Hive ani zmieniać umieszczania danych ani partycjonowania tabel.

Aby uzyskać więcej informacji, zobacz Apache Hive

Przygotowanie środowiska

Umożliwia utworzenie klastra Apache Flink za pomocą systemu HMS w witrynie Azure Portal. Szczegółowe instrukcje dotyczące tworzenia klastra Flink można znaleźć w artykule .

Zrzut ekranu przedstawiający sposób tworzenia klastra Flink.

Po utworzeniu klastra sprawdź, czy HMS jest uruchomiony, czy nie po stronie usługi AKS.

Zrzut ekranu przedstawiający sposób sprawdzania stanu HMS w klastrze Flink.

Przygotowywanie tematu danych transakcji zamówienia użytkownika platformy Kafka w usłudze HDInsight

Pobierz plik jar klienta platformy Kafka przy użyciu następującego polecenia:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Usuń plik tar z

tar -xvf kafka_2.12-3.2.0.tgz

Wygeneruj komunikaty do tematu platformy Kafka.

Zrzut ekranu przedstawiający sposób tworzenia komunikatów w temacie platformy Kafka.

Inne polecenia:

Uwaga

Musisz zastąpić bootstrap-server własną nazwą hosta lub adresem IP brokerów platformy Kafka

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Przygotowywanie danych głównych zamówień użytkownika w usłudze MySQL na platformie Azure

Testowanie bazy danych:

Zrzut ekranu przedstawiający sposób testowania bazy danych na platformie Kafka.

Zrzut ekranu przedstawiający sposób uruchamiania usługi Cloud Shell w portalu.

Przygotuj tabelę zamówień:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Korzystanie z wymaganego łącznika platformy Kafka przy użyciu protokołu SSH i plików JAR bazy danych MySQL

Uwaga

Pobierz prawidłowy plik jar wersji zgodnie z naszą wersją platformy Kafka usługi HDInsight i wersją programu MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Przenoszenie pliku jar planisty

Przenieś plik jar flink-table-planner_2.12-1.17.0-.... plik jar znajdujący się w zasobnikach webssh /opt do /lib i wyjdź plik jar flink-table-planner-loader1.17.0-..... jar /opt/flink-webssh/opt/ from /lib. Aby uzyskać więcej informacji, zobacz problem . Wykonaj następujące kroki, aby przenieść plik jar planisty.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Uwaga

Dodatkowy plik jar planisty jest potrzebny tylko w przypadku korzystania z dialektu Hive lub punktu końcowego HiveServer2. Jest to jednak zalecana konfiguracja integracji programu Hive.

Walidacja

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Uwaga

Ponieważ używamy już klastra Flink z magazynem metadanych Hive, nie ma potrzeby wykonywania żadnych dodatkowych konfiguracji.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Zrzut ekranu przedstawiający sposób tworzenia tabeli platformy Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Zrzut ekranu przedstawiający sposób tworzenia tabeli mysql.

Zrzut ekranu przedstawiający dane wyjściowe tabeli.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Zrzut ekranu przedstawiający sposób ujścia transakcji użytkownika.

Zrzut ekranu przedstawiający interfejs użytkownika Flink.

Sprawdź, czy dane zamówienia transakcji użytkownika na platformie Kafka są dodawane w kolejności tabeli głównej w usłudze MySQL w usłudze Azure Cloud Shell

Zrzut ekranu przedstawiający sposób sprawdzania transakcji użytkownika.

Tworzenie trzech kolejnych zamówień użytkowników na platformie Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Zrzut ekranu przedstawiający sposób sprawdzania danych tabeli platformy Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Zrzut ekranu przedstawiający sposób sprawdzania tabeli zamówień.

Sprawdzanie product_id = 104 rekordu jest dodawane w tabeli zamówień w usłudze MySQL w usłudze Azure Cloud Shell

Zrzut ekranu przedstawiający rekordy dodane do tabeli zamówień.

Odwołanie