Las operaciones Apache Spark admitidas por el Hive Warehouse Connector en Azure HDInsight

En este artículo se muestran las operaciones basadas en Spark que admite Hive Warehouse Connector (HWC). Todos los ejemplos que se muestran a continuación se ejecutarán a través del shell de Apache Spark.

Requisito previo

Complete los pasos de configuración de Hive Warehouse Connector.

Introducción

Para iniciar una sesión de spark-shell, realice los pasos siguientes:

  1. Use el comando ssh para conectarse al clúster de Apache Spark. Modifique el comando siguiente: reemplace CLUSTERNAME por el nombre del clúster y, luego, escriba el comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Desde la sesión de ssh, ejecute el comando siguiente para anotar la versión hive-warehouse-connector-assembly:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Edite el código siguiente con la versión hive-warehouse-connector-assembly identificada anteriormente. Ejecute el siguiente comando para iniciar el shell de Spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Después de iniciar spark-shell, se puede iniciar una instancia del conector de Hive Warehouse mediante los siguientes comandos:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Creación de DataFrames de Spark a partir de consultas de Hive

Los resultados de todas las consultas mediante la biblioteca HWC se devuelven como un elemento DataFrame. En los ejemplos siguientes se muestra cómo crear una consulta de subárbol.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Los resultados de la consulta son DataFrames de Spark, que se pueden usar con bibliotecas de Spark como MLIB y SparkSQL.

Escritura de DataFrames de Spark en tablas de Hive

Spark no admite la escritura en tablas ACID administradas de Hive de forma nativa. Pero con HWC puede escribir cualquier elemento DataFrame en una tabla de Hive. Puede ver como funciona esta funcionalidad en el ejemplo siguiente:

  1. Cree una tabla denominada sampletable_colorado y especifique sus columnas con el comando siguiente:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filtre la tabla hivesampletable en que la columna state es igual a Colorado. Esta consulta de Hive devuelve un elemento DataFrame de Spark y el resultado se guarda en la tabla de Hive sampletable_colorado mediante la función write.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Vea los resultados con el siguiente comando:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Escrituras de flujos estructurados

Al usar el conector de Hive Warehouse, puede emplear los flujos de Spark para escribir datos en las tablas de Hive.

Importante

No se admiten las escrituras de streaming estructurado en clústeres de Spark 4.0 con ESP habilitado.

En el ejemplo se ingieren datos de una secuencia de Spark en el puerto de localhost 9999 en una tabla de Hive. Hive Warehouse Connector.

  1. Desde el shell de Spark abierto, inicie una secuencia de Spark con el siguiente comando:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Genere datos para el flujo de Spark que creó. Para ello, siga estos pasos:

    1. Abra una segunda sesión de SSH en el mismo clúster de Spark.
    2. En el símbolo del sistema, escriba nc -lk 9999. Este comando usa la utilidad netcat para enviar datos desde la línea de comandos al puerto especificado.
  3. Vuelva a la primera sesión de SSH y cree una nueva tabla de Hive para contener los datos de streaming. En spark-shell, escriba el siguiente comando:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Después, escriba los datos de streaming en la tabla recién creada mediante el siguiente comando:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Importante

    Actualmente, las opciones metastoreUri y database deben establecerse de forma manual debido a un problema conocido en Apache Spark. Para obtener más información acerca de este problema, consulte SPARK-25460.

  5. Vuelva a la segunda sesión de SSH y escriba los valores siguientes:

    foo
    HiveSpark
    bar
    
  6. Vuelva a la primera sesión de SSH y observe la breve actividad. Use el siguiente comando para ver los datos:

    hive.table("stream_table").show()
    

Use Ctrl + C para detener netcat en la segunda sesión de SSH. Use :q para salir de spark-shell en la primera sesión de SSH.

Pasos siguientes