Operazioni di Apache Spark supportate da Hive Warehouse Connector in Azure HDInsight

Questo articolo illustra le operazioni basate su Spark supportate da Hive Warehouse Connector (HWC). Tutti gli esempi riportati di seguito verranno eseguiti tramite la shell Apache Spark.

Prerequisito

Completare la procedura di configurazione di Hive Warehouse Connector.

Introduzione

Per avviare una sessione della shell Spark, seguire questa procedura:

  1. Usare il comando ssh per connettersi al cluster Apache Spark. Modificare il comando seguente sostituendo CLUSTERNAME con il nome del cluster in uso e quindi immettere il comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Nella sessione ssh eseguire il comando seguente per prendere nota della versione di hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Modificare il codice seguente con la versione di hive-warehouse-connector-assembly identificata in precedenza. Eseguire quindi il comando per avviare la shell Spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Dopo l'avvio della shell Spark, è possibile avviare un'istanza di Hive Warehouse Connector usando i comandi seguenti:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Creazione di DataFrame Spark con query Hive

I risultati di tutte le query che usano la libreria HWC vengono restituiti come DataFrame. Gli esempi seguenti illustrano come creare una query Hive di base.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

I risultati della query sono DataFrame Spark, che possono essere usati con le librerie Spark come MLIB e SparkSQL.

Scrittura di DataFrame Spark nelle tabelle Hive

Spark non supporta in modo nativo la scrittura nelle tabelle ACID gestite di Hive. Tuttavia, usando HWC, è possibile scrivere qualsiasi dataframe in una tabella Hive. È possibile vedere come opera questa funzionalità nell'esempio seguente:

  1. Creare una tabella denominata sampletable_colorado e specificarne le colonne usando il comando seguente:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filtrare la tabella hivesampletable dove la colonna state è uguale a Colorado. Questa query hive restituisce un dataframe Spark e il risultato viene salvato nella tabella sampletable_colorado Hive usando la write funzione .

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Verificare i risultati con il comando seguente:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Operazioni di scrittura di flussi strutturati

Con Hive Warehouse Connector, è possibile usare i flussi Spark per scrivere dati nelle tabelle Hive.

Importante

Le operazioni di scrittura di flussi strutturati non sono supportate nei cluster Spark 4.0 abilitati per ESP.

Seguire la procedura descritta di seguito per inserire dati da un flusso Spark sulla porta localhost 9999 in una tabella Hive tramite Hive Warehouse Connector.

  1. Dalla shell Spark aperta, avviare un flusso Spark con il comando seguente:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Generare i dati per il flusso Spark creato, seguendo questa procedura:

    1. Aprire una seconda sessione SSH nello stesso cluster Spark.
    2. Al prompt dei comandi digitare nc -lk 9999. Questo comando usa l'utilità netcat per inviare dati dalla riga di comando alla porta specificata.
  3. Tornare alla prima sessione SSH e creare una nuova tabella Hive per conservare i dati di flusso. Nella shell Spark immettere il comando seguente:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Scrivere quindi i dati di flusso nella tabella appena creata usando il comando seguente:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Importante

    Attualmente è necessario impostare le opzioni metastoreUri e database in modo manuale a causa di un problema noto di Apache Spark. Per altre informazioni su questo problema, vedere SPARK-25460.

  5. Tornare alla seconda sessione SSH e immettere i valori seguenti:

    foo
    HiveSpark
    bar
    
  6. Tornare alla prima sessione SSH e notare la breve attività. Per visualizzare i dati, usare il comando seguente:

    hive.table("stream_table").show()
    

Usare CTRL+C per arrestare netcat la seconda sessione SSH. Usare :q per uscire dalla shell Spark nella prima sessione SSH.

Passaggi successivi