Bagikan melalui


Operasi Apache Spark didukung oleh Apache Hive Warehouse Connector di Azure HDInsight

Artikel ini menunjukkan operasi berbasis spark yang didukung oleh Apache Hive Warehouse Connector (HWC). Semua contoh yang ditunjukkan di bawah ini akan dijalankan melalui shell Apache Spark.

Prasyarat

Selesaikan langkah-langkah Penyiapan Hive Warehouse Connector.

Memulai

Untuk memulai sesi spark-shell, lakukan langkah-langkah berikut:

  1. Gunakan perintah ssh untuk terhubung ke klaster Apache Spark Anda. Edit perintah di bawah ini dengan mengganti CLUSTERNAME dengan nama klaster Anda, lalu masukkan perintah:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Dari sesi ssh Anda, jalankan perintah berikut untuk mencatat versi hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Edit kode di bawah ini dengan versi hive-warehouse-connector-assembly yang diidentifikasi di atas. Kemudian jalankan perintah untuk memulai spark shell:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Setelah memulai spark shell, instans Apache Hive Warehouse Connector dapat dimulai menggunakan perintah berikut:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Membuat Spark DataFrame menggunakan kueri Apache Hive

Hasil dari semua kueri yang menggunakan pustaka HWC dikembalikan sebagai DataFrame. Contoh berikut menunjukkan cara membuat kueri Apache hive dasar.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Hasil kueri adalah Spark DataFrames, yang dapat digunakan dengan pustaka Spark seperti MLIB dan SparkSQL.

Menulis Spark DataFrames ke tabel Apache Hive

Spark tidak secara asli mendukung penulisan ke tabel ACID yang dikelola Apache Hive. Namun, menggunakan HWC, Anda dapat menulis DataFrame apa pun ke tabel Hive. Anda dapat melihat fungsionalitas ini di tempat kerja dalam contoh berikut:

  1. Buat tabel yang dipanggil sampletable_colorado dan tentukan kolomnya menggunakan perintah berikut ini:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Memfilter tabel hivesampletable di mana kolom state setara Colorado. Kueri hive ini mengembalikan DataFrame Spark yang disimpan dalam tabel Hive sampletable_colorado menggunakan fungsi write.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Lihat hasil dengan perintah berikut:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Penulisan streaming terstruktur

Menggunakan Apache Hive Warehouse Connector, Anda dapat menggunakan streaming Spark untuk menulis data ke tabel Apache Hive.

Penting

Penulisan streaming terstruktur tidak didukung di kluster Spark 4.0 yang diaktifkan oleh ESP.

Ikuti langkah-langkah di bawah ini untuk menyerap data dari aliran Spark di port localhost 9999 ke dalam tabel Apache Hive melalui. Apache Hive Warehouse Connector.

  1. Dari shell Spark Anda yang terbuka, mulailah aliran spark dengan perintah berikut:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Hasilkan data untuk aliran Spark yang Anda buat, dengan melakukan langkah-langkah berikut:

    1. Buka sesi SSH kedua pada kluster Spark yang sama.
    2. Pada prompt perintah, ketik nc -lk 9999. Perintah ini menggunakan utilitas netcat untuk mengirim data dari baris perintah ke port yang ditentukan.
  3. Kembali ke sesi SSH pertama dan buat tabel Apache Hive baru untuk menyimpan data streaming. Pada spark-shell, masukkan perintah berikut:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Kemudian tulis data streaming ke tabel yang baru dibuat menggunakan perintah berikut:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Penting

    Opsi metastoreUri dan database saat ini harus diatur secara manual karena masalah yang diketahui di Apache Spark. Untuk informasi selengkapnya tentang masalah ini, lihat SPARK-25460.

  5. Kembali ke sesi SSH kedua dan masukkan nilai berikut ini:

    foo
    HiveSpark
    bar
    
  6. Kembali ke sesi SSH pertama dan catat aktivitas singkatnya. Untuk menampilkan data, gunakan perintah berikut:

    hive.table("stream_table").show()
    

Gunakan Ctrl + C untuk menghentikan netcat pada sesi SSH kedua. Gunakan :q untuk keluar dari spark-shell pada sesi SSH pertama.

Langkah berikutnya