Partilhar via


Operações do Apache Spark suportadas pelo Hive Warehouse Connector no Azure HDInsight

Este artigo mostra operações baseadas em faísca suportadas pelo Hive Warehouse Connector (HWC). Todos os exemplos mostrados abaixo serão executados através do shell Apache Spark.

Pré-requisito

Conclua as etapas de configuração do Hive Warehouse Connector.

Introdução

Para iniciar uma sessão de spark-shell, execute as seguintes etapas:

  1. Use o comando ssh para se conectar ao cluster do Apache Spark. Edite o comando abaixo substituindo CLUSTERNAME pelo nome do cluster e digite o comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Na sessão ssh, execute o seguinte comando para anotar a hive-warehouse-connector-assembly versão:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Edite o código abaixo com a versão identificada hive-warehouse-connector-assembly acima. Em seguida, execute o comando para iniciar o shell de faísca:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Depois de iniciar o spark-shell, uma instância do Hive Warehouse Connector pode ser iniciada usando os seguintes comandos:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Criando DataFrames do Spark usando consultas do Hive

Os resultados de todas as consultas usando a biblioteca HWC são retornados como um DataFrame. Os exemplos a seguir demonstram como criar uma consulta de hive básica.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Os resultados da consulta são Spark DataFrames, que podem ser usados com bibliotecas Spark como MLIB e SparkSQL.

Escrevendo DataFrames do Spark em tabelas do Hive

O Spark não suporta nativamente a gravação nas tabelas ACID gerenciadas do Hive. No entanto, usando HWC, você pode gravar qualquer DataFrame em uma tabela Hive. Você pode ver essa funcionalidade em ação no exemplo a seguir:

  1. Crie uma tabela chamada sampletable_colorado e especifique suas colunas usando o seguinte comando:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filtre a tabela hivesampletable onde a coluna state é Coloradoigual a . Essa consulta de hive retorna um Spark DataFrame e o resultado é salvo na tabela sampletable_colorado Hive usando a write função.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Veja os resultados com o seguinte comando:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Gravações de streaming estruturadas

Usando o Hive Warehouse Connector, você pode usar o streaming do Spark para gravar dados em tabelas do Hive.

Importante

Não há suporte para gravações de streaming estruturadas em clusters Spark 4.0 habilitados para ESP.

Siga as etapas abaixo para ingerir dados de um fluxo do Spark na porta localhost 9999 em uma tabela Hive via. Conector de armazém do Hive.

  1. A partir do shell aberto do Spark, inicie um fluxo de faísca com o seguinte comando:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Gere dados para o fluxo do Spark que você criou, executando as seguintes etapas:

    1. Abra uma segunda sessão SSH no mesmo cluster do Spark.
    2. No prompt de comando, digite nc -lk 9999. Este comando usa o netcat utilitário para enviar dados da linha de comando para a porta especificada.
  3. Retorne à primeira sessão SSH e crie uma nova tabela do Hive para armazenar os dados de streaming. No spark-shell, digite o seguinte comando:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Em seguida, escreva os dados de streaming na tabela recém-criada usando o seguinte comando:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Importante

    As metastoreUri opções e database devem ser definidas manualmente devido a um problema conhecido no Apache Spark. Para obter mais informações sobre esse problema, consulte SPARK-25460.

  5. Retorne à segunda sessão SSH e insira os seguintes valores:

    foo
    HiveSpark
    bar
    
  6. Volte para a primeira sessão SSH e anote a breve atividade. Use o seguinte comando para exibir os dados:

    hive.table("stream_table").show()
    

Use Ctrl + C para parar netcat na segunda sessão SSH. Use :q para sair do spark-shell na primeira sessão SSH.

Próximos passos