Apache Spark operations supported by Hive Warehouse Connector in Azure HDInsight
This article shows spark-based operations supported by Hive Warehouse Connector (HWC). All examples shown below will be executed through the Apache Spark shell.
Complete the Hive Warehouse Connector setup steps.
To start a spark-shell session, do the following steps:
Use ssh command to connect to your Apache Spark cluster. Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:
From your ssh session, execute the following command to note the
Edit the code below with the
hive-warehouse-connector-assemblyversion identified above. Then execute the command to start the spark shell:
spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
After starting the spark-shell, a Hive Warehouse Connector instance can be started using the following commands:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Creating Spark DataFrames using Hive queries
The results of all queries using the HWC library are returned as a DataFrame. The following examples demonstrate how to create a basic hive query.
hive.setDatabase("default") val df = hive.executeQuery("select * from hivesampletable") df.filter("state = 'Colorado'").show()
The results of the query are Spark DataFrames, which can be used with Spark libraries like MLIB and SparkSQL.
Writing out Spark DataFrames to Hive tables
Spark doesn't natively support writing to Hive's managed ACID tables. However, using HWC, you can write out any DataFrame to a Hive table. You can see this functionality at work in the following example:
Create a table called
sampletable_coloradoand specify its columns using the following command:
Filter the table
hivesampletablewhere the column
Colorado. This hive query returns a Spark DataFrame and result is saved in the Hive table
hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
View the results with the following command:
Structured streaming writes
Using Hive Warehouse Connector, you can use Spark streaming to write data into Hive tables.
Structured streaming writes are not supported in ESP enabled Spark 4.0 clusters.
Follow the steps below to ingest data from a Spark stream on localhost port 9999 into a Hive table via. Hive Warehouse Connector.
From your open Spark shell, begin a spark stream with the following command:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Generate data for the Spark stream that you created, by doing the following steps:
- Open a second SSH session on the same Spark cluster.
- At the command prompt, type
nc -lk 9999. This command uses the
netcatutility to send data from the command line to the specified port.
Return to the first SSH session and create a new Hive table to hold the streaming data. At the spark-shell, enter the following command:
Then write the streaming data to the newly created table using the following command:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
databaseoptions must currently be set manually due to a known issue in Apache Spark. For more information about this issue, see SPARK-25460.
Return to the second SSH session and enter the following values:
foo HiveSpark bar
Return to the first SSH session and note the brief activity. Use the following command to view the data:
Use Ctrl + C to stop
netcat on the second SSH session. Use
:q to exit spark-shell on the first SSH session.