Table API وSQL في مجموعات Apache Flink® على HDInsight على AKS
هام
هذه الميزة في وضع المعاينة حاليًا. تتضمن شروط الاستخدام التكميلية لمعاينات Microsoft Azure المزيد من الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي أو قيد المعاينة أو التي لم يتم إصدارها بعد في التوفر العام. للحصول على معلومات حول هذه المعاينة المحددة، راجع معلومات معاينة Azure HDInsight على AKS. للأسئلة أو اقتراحات الميزات، يرجى إرسال طلب على AskHDInsight مع التفاصيل ومتابعتنا لمزيد من التحديثات على مجتمع Azure HDInsight.
يتميز Apache Flink بواجهة برمجة تطبيقات ارتباطية - واجهة برمجة تطبيقات الجدول وSQL - للتدفق الموحد ومعالجة الدفعات. واجهة برمجة تطبيقات الجدول هي واجهة برمجة تطبيقات استعلام متكاملة اللغة تسمح بتكوين الاستعلامات من عوامل التشغيل الارتباطية مثل التحديد والتصفية والانضمام بشكل بديهي. يعتمد دعم SQL الخاص ب Flink على Apache Calcite، الذي ينفذ معيار SQL.
تتكامل واجهات Table API وSQL بسلاسة مع بعضها البعض وواجهة برمجة تطبيقات DataStream الخاصة ب Flink. يمكنك التبديل بسهولة بين جميع واجهات برمجة التطبيقات والمكتبات، والتي تعتمد عليها.
Apache Flink SQL
مثل محركات SQL الأخرى، تعمل استعلامات Flink أعلى الجداول. يختلف عن قاعدة البيانات التقليدية لأن Flink لا يدير البيانات الثابتة محليا؛ بدلا من ذلك، تعمل الاستعلامات الخاصة به بشكل مستمر عبر الجداول الخارجية.
تبدأ مسارات معالجة البيانات Flink بجداول المصدر وتنتهي بجداول المتلقي. تنتج جداول المصدر صفوفا يتم تشغيلها أثناء تنفيذ الاستعلام؛ إنها الجداول المشار إليها في عبارة FROM للاستعلام. يمكن أن تكون الاتصال ors من نوع HDInsight Kafka أو HDInsight HBase أو Azure Event Hubs أو قواعد البيانات أو أنظمة الملفات أو أي نظام آخر يقع موصله في مسار الفئة.
استخدام عميل Flink SQL في HDInsight على مجموعات AKS
يمكنك الرجوع إلى هذه المقالة حول كيفية استخدام CLI من Secure Shell على مدخل Microsoft Azure. فيما يلي بعض العينات السريعة لكيفية البدء.
لبدء تشغيل عميل SQL
./bin/sql-client.sh
لتمرير ملف sql للتهيئة للتشغيل جنبا إلى جنب مع sql-client
./sql-client.sh -i /path/to/init_file.sql
لتعيين تكوين في sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
يدعم Flink SQL عبارات CREATE التالية
- إنشاء جدول
- إنشاء قاعدة بيانات
- إنشاء كتالوج
فيما يلي مثال لبناء الجملة لتعريف جدول مصدر باستخدام موصل jdbc للاتصال ب MSSQL، مع المعرف، واسم كأعمدة في عبارة CREATE TABLE
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
إنشاء قاعدة بيانات :
CREATE DATABASE students;
إنشاء كتالوج:
CREATE CATALOG myhive WITH ('type'='hive');
يمكنك تشغيل الاستعلامات المستمرة أعلى هذه الجداول
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
اكتب إلى Sink Table من Source Table:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
إضافة تبعيات
يتم استخدام عبارات JAR لإضافة jars للمستخدم في classpath أو إزالة jars المستخدم من classpath أو إظهار jars المضافة في classpath في وقت التشغيل.
يدعم Flink SQL عبارات JAR التالية:
- إضافة JAR
- إظهار JARS
- إزالة JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive Metastore في مجموعات Apache Flink® على HDInsight على AKS
توفر الكتالوجات بيانات تعريف، مثل قواعد البيانات والجداول والأقسام وطرق العرض والوظائف والمعلومات اللازمة للوصول إلى البيانات المخزنة في قاعدة بيانات أو أنظمة خارجية أخرى.
في HDInsight على AKS، ندعم Flink خيارين للكتالوج:
GenericInMemoryCatalog
GenericInMemoryCatalog هو تنفيذ في الذاكرة للكتالوج. تتوفر جميع الكائنات فقط طوال مدة جلسة sql.
HiveCatalog
يخدم HiveCatalog غرضين؛ كمخزن مستمر لبيانات تعريف Flink النقية، وكواجهة لقراءة وكتابة بيانات تعريف Hive الموجودة.
إشعار
يأتي HDInsight على مجموعات AKS مع خيار متكامل من Hive Metastore ل Apache Flink. يمكنك اختيار Hive Metastore أثناء إنشاء نظام المجموعة
كيفية إنشاء قواعد بيانات Flink وتسجيلها في الكتالوجات
يمكنك الرجوع إلى هذه المقالة حول كيفية استخدام CLI والبدء في Flink SQL Client من Secure Shell على مدخل Microsoft Azure.
بدء
sql-client.sh
جلسة العملDefault_catalog هو كتالوج الذاكرة الافتراضي
دعونا الآن نتحقق من قاعدة البيانات الافتراضية للكتالوج في الذاكرة
دعونا ننشئ كتالوج Hive للإصدار 3.1.2 ونستخدمه
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
إشعار
HDInsight على AKS يدعم Hive 3.1.2 و Hadoop 3.3.2.
hive-conf-dir
تم تعيين إلى الموقع/opt/hive-conf
دعونا ننشئ قاعدة بيانات في كتالوج الخلية ونجعلها افتراضية لجلسة العمل (ما لم يتم تغييرها).
كيفية إنشاء جداول Hive وتسجيلها في كتالوج Hive
اتبع الإرشادات حول كيفية إنشاء قواعد بيانات Flink وتسجيلها في الكتالوج
دعونا ننشئ Flink Table من نوع الموصل Hive بدون قسم
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
إدراج بيانات في hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
قراءة البيانات من hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
إشعار
يقع دليل Hive Warehouse في حاوية حساب التخزين المعينة التي تم اختيارها أثناء إنشاء مجموعة Apache Flink، ويمكن العثور عليها في الدليل hive/warehouse/
يتيح إنشاء جدول Flink لنوع الموصل الخلية مع القسم
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
هام
هناك قيود معروفة في Apache Flink. يتم اختيار الأعمدة 'n' الأخيرة للأقسام، بغض النظر عن عمود القسم المحدد من قبل المستخدم. FLINK-32596 سيكون مفتاح القسم خاطئا عند استخدام لهجة Flink لإنشاء جدول Hive.
المرجع
- Apache Flink Table API وSQL
- Apache وApache Flink وFlink وأسماء مشاريع مصدر مفتوح المقترنة هي علامات تجارية ل Apache Software Foundation (ASF).