Opérations d’Apache Spark prises en charge par Hive Warehouse Connector dans Azure HDInsight

Cet article présente les opérations basées sur Spark prises en charge par Hive Warehouse Connector (HWC). Tous les exemples ci-dessous seront exécutés via l’interpréteur de commandes Apache Spark.

Configuration requise

Suivez les étapes de Configuration de Hive Warehouse Connector.

Prise en main

Pour démarrer une session spark-shell, procédez comme suit :

  1. Utilisez une commande ssh pour vous connecter à votre cluster Apache Spark. Modifiez la commande ci-dessous en remplaçant CLUSTERNAME par le nom de votre cluster, puis entrez la commande :

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. À partir de votre session ssh, exécutez la commande suivante pour noter la version de hive-warehouse-connector-assembly :

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Modifiez le code ci-dessous avec la version de hive-warehouse-connector-assembly identifiée ci-dessus. Exécutez ensuite la commande pour démarrer l’interpréteur de commandes Spark :

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Après avoir démarré l’interpréteur de commandes de Spark, une instance du connecteur d'entrepôt Hive peut être lancée en utilisant les commandes suivantes :

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Création de DataFrames Spark à partir de requêtes Hive

Les résultats de toutes les requêtes utilisant la bibliothèque HWC sont renvoyés sous forme de DataFrame. Les exemples suivants montrent comment créer une requête Hive de base.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Les résultats de la requête sont des DataFrames Spark, qui peuvent être utilisés avec les bibliothèques Spark comme MLIB et SparkSQL.

Écriture de DataFrames Spark dans des tables Hive

Spark ne prend pas en charge en mode natif l’écriture dans des tables ACID managées de Hive. HWC vous permet cependant d’écrire n’importe quel DataFrame dans une table Hive. Vous pouvez voir cette fonctionnalité en action dans l'exemple suivant :

  1. Créez une table appelée sampletable_colorado et spécifiez ses colonnes en utilisant la commande suivante :

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Filtrez la table hivesampletable, où la colonne state est égale à Colorado. Cette requête Hive retourne un DataFrame Spark et le résultat est enregistré dans la table Hive sampletable_colorado à l’aide de la fonction write.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Visualisez les résultats avec la commande suivante :

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Écritures à l’aide du streaming structuré

Le connecteur d’entrepôt Hive vous permet d’utiliser le streaming Spark pour écrire des données dans des tables Hive.

Important

Les écritures de streaming structuré ne sont pas prises en charge dans les clusters Spark 4.0 à extension ESP.

Suivez les étapes ci-dessous pour ingérer les données d’un flux Spark sur le port localhost 9999 dans une table Hive. Hive Warehouse Connector.

  1. À partir de votre interpréteur de commandes Spark ouvert, démarrez un flux Spark à l’aide de la commande suivante :

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Procédez comme suit pour générer les données du flux Spark que vous avez créé :

    1. Ouvrez une deuxième session SSH sur le même cluster Spark.
    2. À l’invite de commandes, tapez nc -lk 9999. Cette commande utilise l’utilitaire netcat pour envoyer des données de la ligne de commande au port spécifié.
  3. Revenez à la première session SSH et créez une nouvelle table Hive pour contenir les données de streaming. Dans l’interpréteur de commandes de Spark, entrez la commande suivante :

    hive.createTable("stream_table").column("value","string").create()
    
  4. Puis, écrivez les données de streaming dans la table nouvellement créée à l’aide de la commande suivante :

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Important

    Les options metastoreUri et database doivent actuellement être définies manuellement en raison d'un problème connu dans Apache Spark. Pour plus d'informations sur ce problème, voir SPARK-25460.

  5. Revenez à la deuxième session SSH et entrez les valeurs suivantes :

    foo
    HiveSpark
    bar
    
  6. Revenez à la première session SSH et notez la brève activité. Pour afficher les données, utilisez la commande suivante :

    hive.table("stream_table").show()
    

Utilisez Ctrl + C pour arrêter netcat sur la deuxième session SSH. Utilisez :q pour quitter l’interpréteur de commandes de Spark sur la première session SSH.

Étapes suivantes