Delen via


Apache Spark-bewerkingen die worden ondersteund door Hive Warehouse Connector in Azure HDInsight

Dit artikel bevat spark-gebaseerde bewerkingen die worden ondersteund door Hive Warehouse Connector (HWC). Alle weergegeven voorbeelden worden uitgevoerd via de Apache Spark-shell.

Vereiste

Voltooi de installatiestappen van de Hive Warehouse-connector.

Aan de slag

Ga als volgt te werk om een spark-shell-sessie te starten:

  1. Gebruik de ssh-opdracht om verbinding te maken met uw Apache Spark-cluster. Bewerk de opdracht door CLUSTERNAME te vervangen door de naam van uw cluster en voer vervolgens de opdracht in:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Voer vanuit uw ssh-sessie de volgende opdracht uit om de hive-warehouse-connector-assembly versie te noteren:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Bewerk de code met de hive-warehouse-connector-assembly hierboven geïdentificeerde versie. Voer vervolgens de opdracht uit om de Spark Shell te starten:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Nadat u de spark-shell hebt gestart, kan een Hive Warehouse Connector-exemplaar worden gestart met behulp van de volgende opdrachten:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Spark DataFrames maken met Hive-query's

De resultaten van alle query's die gebruikmaken van de HWC-bibliotheek, worden geretourneerd als een DataFrame. In de volgende voorbeelden ziet u hoe u een eenvoudige Hive-query maakt.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

De resultaten van de query zijn Spark DataFrames, die kunnen worden gebruikt met Spark-bibliotheken zoals MLIB en SparkSQL.

Spark DataFrames naar Hive-tabellen schrijven

Spark biedt geen systeemeigen ondersteuning voor het schrijven naar de beheerde ACID-tabellen van Hive. Met HWC kunt u echter elk DataFrame naar een Hive-tabel schrijven. U kunt deze functionaliteit op het werk zien in het volgende voorbeeld:

  1. Maak een tabel met de naam sampletable_colorado en geef de kolommen op met behulp van de volgende opdracht:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filter de tabel hivesampletable waarin de kolom state gelijk is Coloradoaan . Deze Hive-query retourneert een Spark DataFrame en het resultaat wordt opgeslagen in de Hive-tabel sampletable_colorado met behulp van de write functie.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Bekijk de resultaten met de volgende opdracht:

    hive.table("sampletable_colorado").show()
    

    Hive Warehouse-connector toont hive-tabel.

Gestructureerde streaming-schrijfbewerkingen

Met Hive Warehouse Connector kunt u Spark-streaming gebruiken om gegevens naar Hive-tabellen te schrijven.

Belangrijk

Gestructureerde streaming-schrijfbewerkingen worden niet ondersteund in Spark 4.0-clusters met ESP.

Volg de stappen voor het opnemen van gegevens uit een Spark-stream op localhost-poort 9999 in een Hive-tabel via. Hive Warehouse Connector.

  1. Start vanuit uw open Spark-shell een Spark-stream met de volgende opdracht:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Genereer gegevens voor de Spark-stroom die u hebt gemaakt door de volgende stappen uit te voeren:

    1. Open een tweede SSH-sessie in hetzelfde Spark-cluster.
    2. Typ bij de opdrachtprompt nc -lk 9999. Met deze opdracht wordt het netcat hulpprogramma gebruikt om gegevens vanaf de opdrachtregel naar de opgegeven poort te verzenden.
  3. Ga terug naar de eerste SSH-sessie en maak een nieuwe Hive-tabel voor het opslaan van de streaminggegevens. Voer in de spark-shell de volgende opdracht in:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Schrijf vervolgens de streaminggegevens naar de zojuist gemaakte tabel met behulp van de volgende opdracht:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Belangrijk

    De metastoreUri opties en database opties moeten momenteel handmatig worden ingesteld vanwege een bekend probleem in Apache Spark. Zie SPARK-25460 voor meer informatie over dit probleem.

  5. Ga terug naar de tweede SSH-sessie en voer de volgende waarden in:

    foo
    HiveSpark
    bar
    
  6. Ga terug naar de eerste SSH-sessie en noteer de korte activiteit. Gebruik de volgende opdracht om de gegevens weer te geven:

    hive.table("stream_table").show()
    

Gebruik Ctrl + C om te stoppen netcat op de tweede SSH-sessie. Gebruik :q dit om spark-shell af te sluiten bij de eerste SSH-sessie.

Volgende stappen