Apache Spark-bewerkingen die worden ondersteund door Hive Warehouse Connector in Azure HDInsight
Dit artikel bevat spark-gebaseerde bewerkingen die worden ondersteund door Hive Warehouse Connector (HWC). Alle weergegeven voorbeelden worden uitgevoerd via de Apache Spark-shell.
Vereiste
Voltooi de installatiestappen van de Hive Warehouse-connector.
Aan de slag
Ga als volgt te werk om een spark-shell-sessie te starten:
Gebruik de ssh-opdracht om verbinding te maken met uw Apache Spark-cluster. Bewerk de opdracht door CLUSTERNAME te vervangen door de naam van uw cluster en voer vervolgens de opdracht in:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Voer vanuit uw ssh-sessie de volgende opdracht uit om de
hive-warehouse-connector-assembly
versie te noteren:ls /usr/hdp/current/hive_warehouse_connector
Bewerk de code met de
hive-warehouse-connector-assembly
hierboven geïdentificeerde versie. Voer vervolgens de opdracht uit om de Spark Shell te starten:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Nadat u de spark-shell hebt gestart, kan een Hive Warehouse Connector-exemplaar worden gestart met behulp van de volgende opdrachten:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Spark DataFrames maken met Hive-query's
De resultaten van alle query's die gebruikmaken van de HWC-bibliotheek, worden geretourneerd als een DataFrame. In de volgende voorbeelden ziet u hoe u een eenvoudige Hive-query maakt.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
De resultaten van de query zijn Spark DataFrames, die kunnen worden gebruikt met Spark-bibliotheken zoals MLIB en SparkSQL.
Spark DataFrames naar Hive-tabellen schrijven
Spark biedt geen systeemeigen ondersteuning voor het schrijven naar de beheerde ACID-tabellen van Hive. Met HWC kunt u echter elk DataFrame naar een Hive-tabel schrijven. U kunt deze functionaliteit op het werk zien in het volgende voorbeeld:
Maak een tabel met de naam
sampletable_colorado
en geef de kolommen op met behulp van de volgende opdracht:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filter de tabel
hivesampletable
waarin de kolomstate
gelijk isColorado
aan . Deze Hive-query retourneert een Spark DataFrame en het resultaat wordt opgeslagen in de Hive-tabelsampletable_colorado
met behulp van dewrite
functie.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Bekijk de resultaten met de volgende opdracht:
hive.table("sampletable_colorado").show()
Gestructureerde streaming-schrijfbewerkingen
Met Hive Warehouse Connector kunt u Spark-streaming gebruiken om gegevens naar Hive-tabellen te schrijven.
Belangrijk
Gestructureerde streaming-schrijfbewerkingen worden niet ondersteund in Spark 4.0-clusters met ESP.
Volg de stappen voor het opnemen van gegevens uit een Spark-stream op localhost-poort 9999 in een Hive-tabel via. Hive Warehouse Connector.
Start vanuit uw open Spark-shell een Spark-stream met de volgende opdracht:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Genereer gegevens voor de Spark-stroom die u hebt gemaakt door de volgende stappen uit te voeren:
- Open een tweede SSH-sessie in hetzelfde Spark-cluster.
- Typ bij de opdrachtprompt
nc -lk 9999
. Met deze opdracht wordt hetnetcat
hulpprogramma gebruikt om gegevens vanaf de opdrachtregel naar de opgegeven poort te verzenden.
Ga terug naar de eerste SSH-sessie en maak een nieuwe Hive-tabel voor het opslaan van de streaminggegevens. Voer in de spark-shell de volgende opdracht in:
hive.createTable("stream_table").column("value","string").create()
Schrijf vervolgens de streaminggegevens naar de zojuist gemaakte tabel met behulp van de volgende opdracht:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Belangrijk
De
metastoreUri
opties endatabase
opties moeten momenteel handmatig worden ingesteld vanwege een bekend probleem in Apache Spark. Zie SPARK-25460 voor meer informatie over dit probleem.Ga terug naar de tweede SSH-sessie en voer de volgende waarden in:
foo HiveSpark bar
Ga terug naar de eerste SSH-sessie en noteer de korte activiteit. Gebruik de volgende opdracht om de gegevens weer te geven:
hive.table("stream_table").show()
Gebruik Ctrl + C om te stoppen netcat
op de tweede SSH-sessie. Gebruik :q
dit om spark-shell af te sluiten bij de eerste SSH-sessie.