Operasi Apache Spark didukung oleh Apache Hive Warehouse Connector di Azure HDInsight
Artikel ini menunjukkan operasi berbasis spark yang didukung oleh Apache Hive Warehouse Connector (HWC). Semua contoh yang ditunjukkan di bawah ini akan dijalankan melalui shell Apache Spark.
Prasyarat
Selesaikan langkah-langkah Penyiapan Hive Warehouse Connector.
Memulai
Untuk memulai sesi spark-shell, lakukan langkah-langkah berikut:
Gunakan perintah ssh untuk terhubung ke klaster Apache Spark Anda. Edit perintah di bawah ini dengan mengganti CLUSTERNAME dengan nama klaster Anda, lalu masukkan perintah:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Dari sesi ssh Anda, jalankan perintah berikut untuk mencatat versi
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Edit kode di bawah ini dengan versi
hive-warehouse-connector-assembly
yang diidentifikasi di atas. Kemudian jalankan perintah untuk memulai spark shell:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Setelah memulai spark shell, instans Apache Hive Warehouse Connector dapat dimulai menggunakan perintah berikut:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Membuat Spark DataFrame menggunakan kueri Apache Hive
Hasil dari semua kueri yang menggunakan pustaka HWC dikembalikan sebagai DataFrame. Contoh berikut menunjukkan cara membuat kueri Apache hive dasar.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Hasil kueri adalah Spark DataFrames, yang dapat digunakan dengan pustaka Spark seperti MLIB dan SparkSQL.
Menulis Spark DataFrames ke tabel Apache Hive
Spark tidak secara asli mendukung penulisan ke tabel ACID yang dikelola Apache Hive. Namun, menggunakan HWC, Anda dapat menulis DataFrame apa pun ke tabel Hive. Anda dapat melihat fungsionalitas ini di tempat kerja dalam contoh berikut:
Buat tabel yang dipanggil
sampletable_colorado
dan tentukan kolomnya menggunakan perintah berikut ini:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Memfilter tabel
hivesampletable
di mana kolomstate
setaraColorado
. Kueri hive ini mengembalikan DataFrame Spark yang disimpan dalam tabel Hivesampletable_colorado
menggunakan fungsiwrite
.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Lihat hasil dengan perintah berikut:
hive.table("sampletable_colorado").show()
Penulisan streaming terstruktur
Menggunakan Apache Hive Warehouse Connector, Anda dapat menggunakan streaming Spark untuk menulis data ke tabel Apache Hive.
Penting
Penulisan streaming terstruktur tidak didukung di kluster Spark 4.0 yang diaktifkan oleh ESP.
Ikuti langkah-langkah di bawah ini untuk menyerap data dari aliran Spark di port localhost 9999 ke dalam tabel Apache Hive melalui. Apache Hive Warehouse Connector.
Dari shell Spark Anda yang terbuka, mulailah aliran spark dengan perintah berikut:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Hasilkan data untuk aliran Spark yang Anda buat, dengan melakukan langkah-langkah berikut:
- Buka sesi SSH kedua pada kluster Spark yang sama.
- Pada prompt perintah, ketik
nc -lk 9999
. Perintah ini menggunakan utilitasnetcat
untuk mengirim data dari baris perintah ke port yang ditentukan.
Kembali ke sesi SSH pertama dan buat tabel Apache Hive baru untuk menyimpan data streaming. Pada spark-shell, masukkan perintah berikut:
hive.createTable("stream_table").column("value","string").create()
Kemudian tulis data streaming ke tabel yang baru dibuat menggunakan perintah berikut:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Penting
Opsi
metastoreUri
dandatabase
saat ini harus diatur secara manual karena masalah yang diketahui di Apache Spark. Untuk informasi selengkapnya tentang masalah ini, lihat SPARK-25460.Kembali ke sesi SSH kedua dan masukkan nilai berikut ini:
foo HiveSpark bar
Kembali ke sesi SSH pertama dan catat aktivitas singkatnya. Untuk menampilkan data, gunakan perintah berikut:
hive.table("stream_table").show()
Gunakan Ctrl + C untuk menghentikan netcat
pada sesi SSH kedua. Gunakan :q
untuk keluar dari spark-shell pada sesi SSH pertama.
Langkah berikutnya
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk