عمليات Apache Spark التي يدعمها Hive Warehouse Connector في Azure HDInsight

توضح هذه المقالة العمليات المستندة إلى شرارة، والمدعومة بواسطة Hive Warehouse Connector (HWC). سيتم تنفيذ جميع الأمثلة الموضحة أدناه من خلال غلاف Apache Spark.

المتطلب الأساسي

أكمل خطوات إعداد موصل Hive Warehouse.

الشروع في العمل

لبدء جلسة شرارة، قم بالخطوات التالية:

  1. استخدم الأمر ssh للاتصال بنظام مجموعة Apache Spark الخاص بك. قم بتحرير الأمر أدناه عن طريق استبدال اسم نظام المجموعة باسم نظام مجموعتك ثم إدخال الأمر:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. من جلسة ssh الخاصة بك، قم بتنفيذ الأمر التالي لملاحظة إصدار hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. قم بتحرير التعليمة البرمجية أدناه باستخدام الإصدار hive-warehouse-connector-assembly المحدد أعلاه. ثم نفّذ الأمر لبدء spark shell:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. بعد بدء إصدار spark-shell، يمكن بدء مثيل Hive Warehouse Connector باستخدام الأوامر التالية:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

تكوين Spark DataFrames باستخدام استعلامات Hive

يتم إرجاع نتائج كافة الاستعلامات التي تستخدم مكتبة HWC كإطار بيانات. توضح الأمثلة التالية كيفية إنشاء استعلام خلية أساسي.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

نتائج الاستعلام هي Spark DataFrames، والتي يمكن استخدامها مع مكتبات Spark؛ مثل: MLIB، وSparkSQL.

كتابة Spark DataFrames لجداول Hive

لا يدعم Spark أصلاً الكتابة إلى جداول ACID المُدارة الخاصة بـ Hive. ومع ذلك، باستخدام HWC، يمكنك كتابة أي DataFrame في جدول Hive. يمكنك رؤية هذه الوظيفة في العمل في المثال التالي:

  1. أنشئ جدولا يسمى sampletable_colorado وحدد أعمدةه باستخدام الأمر التالي:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. قم بتصفية الجدول hivesampletable حيث يساوي Coloradoالعمود state . يقوم استعلام الخلية هذا بإرجاع Spark DataFrame ويتم حفظ النتيجة في جدول sampletable_colorado Hive باستخدام الدالة write .

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. اعرض النتائج بالأمر التالي:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

يكتب التدفق المنظم

باستخدام Hive Warehouse Connector، يمكنك استخدام Spark Streaming لكتابة البيانات في جداول Hive.

هام

عمليات الكتابة المتدفقة الهيكلية غير مدعومة في مجموعات Spark 4.0 التي تم تمكين ESP لها.

اتبع الخطوات أدناه لاستيعاب البيانات من تدفق شرارة على منفذ المضيف المحلي 9999 في جدول خلية عبر. موصل مستودع خلية النحل.

  1. من غلاف Spark المفتوح، ابدأ دفق شرارة باستخدام الأمر التالي:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. قم بإنشاء بيانات لتيار Spark الذي قمت بإنشائه، من خلال القيام بالخطوات التالية:

    1. افتح جلسة SSH ثانية على نفس مجموعة Spark.
    2. في نافذة الأوامر، يُرجى كتابة nc -lk 9999. يستخدم netcat هذا الأمر الأداة المساعدة لإرسال البيانات من سطر الأوامر إلى المنفذ المحدد.
  3. ارجع إلى جلسة SSH الأولى، وأنشئ جدول Hive جديدًا لاحتواء البيانات المتدفقة. في قذيفة شرارة، أدخل الأمر التالي:

    hive.createTable("stream_table").column("value","string").create()
    
  4. ثم اكتب بيانات التدفق إلى الجدول الذي تم إنشاؤه حديثًا باستخدام الأمر التالي:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    هام

    يجب تعيين الخيارين metastoreUri و database يدويا حاليا بسبب مشكلة معروفة في Apache Spark. لمزيد من المعلومات حول هذه المشكلة، راجع SPARK-25460.

  5. ارجع إلى جلسة SSH الثانية، وأدخل القيم التالية:

    foo
    HiveSpark
    bar
    
  6. ارجع إلى جلسة SSH الأولى، ولاحظ النشاط الموجز. استخدم الأمر التالي لعرض البيانات:

    hive.table("stream_table").show()
    

استخدم Ctrl + C للتوقف netcat في جلسة SSH الثانية. استخدم :q للخروج من spark-shell في جلسة SSH الأولى.

الخطوات التالية