Azure HDInsight の Hive Warehouse Connector でサポートされる Apache Spark の操作

この記事では、Hive Warehouse Connector (HWC) でサポートされる Spark ベースの操作について説明します。 次に示すすべての例は、Apache Spark シェルを通じて実行されます。

前提条件

Hive Warehouse Connector の設定ステップを完了します。

作業の開始

spark-shell セッションを開始するには、次の手順を実行します。

  1. ssh コマンドを使用して Apache Spark クラスターに接続します。 次のコマンドを編集して CLUSTERNAME をクラスターの名前に置き換えてから、そのコマンドを入力します。

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. SSH セッションから次のコマンドを実行して、hive-warehouse-connector-assembly のバージョンを確認します。

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. 上で特定した hive-warehouse-connector-assembly のバージョンを使用して、次のコードを編集します。 次のコマンドを実行して、spark シェルを起動します。

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. spark-shell の起動後、次のコマンドを使用して Hive Warehouse Connector インスタンスを開始できます。

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Hive クエリを使用した Spark DataFrame の作成

HWC ライブラリを使用したすべてのクエリの結果は、DataFrame として返されます。 次の例では、基本的な Hive クエリを作成する方法を示します。

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

クエリの結果は Spark DataFrame です。これは、MLIB や SparkSQL のような Spark ライブラリと共に使用できます。

Spark DataFrame から Hive テーブルへの書き出し

Spark では、Hive によって管理される ACID テーブルへの書き込みはネイティブにサポートされていません。 ただし、HWC を使用すると、どの DataFrame も Hive テーブルに書き出すことができます。 次の例では、この機能の動作を確認できます。

  1. 次のコマンドを使用し、sampletable_colorado というテーブルを作成してその列を指定します。

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. テーブル hivesampletable をフィルターします (列 state = Colorado)。 この Hive クエリからは Spark DataFrame が返され、結果は write 関数を使用して Hive テーブル sampletable_colorado に保存されます。

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. 次のコマンドを使用して結果を表示します。

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

構造化ストリーミングの書き込み

Hive Warehouse Connector を使用すると、Spark ストリーミングを使って Hive テーブルにデータを書き込むことができます。

重要

構造化ストリーミングの書き込みは、ESP が有効になっている Spark 4.0 クラスターではサポートされていません。

localhost ポート 9999 の Spark ストリームから Hive テーブルにデータを取り込むには (Hive Warehouse Connector 経由)、次の手順のようにします。

  1. 開いている Spark シェルから、次のコマンドを使用して Spark ストリームを開始します。

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. 次の手順を実行して、作成した Spark ストリームのためのデータを生成します。

    1. 同じ Spark クラスターで2番目の SSH セッションを開きます。
    2. コマンド プロンプトで、「nc -lk 9999」と入力します。 このコマンドでは、netcat ユーティリティを使用して、コマンド ラインから指定のポートにデータを送信します。
  3. 最初の SSH セッションに戻り、ストリーミング データを保持する新しい Hive テーブルを作成します。 spark-shell で、次のコマンドを入力します。

    hive.createTable("stream_table").column("value","string").create()
    
  4. その後、次のコマンドを使用して、新しく作成したテーブルにストリーミング データを書き込みます。

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    重要

    現在、metastoreUri および database オプションは、Apache Spark の既知の問題のため、手動で設定する必要があります。 この問題の詳細については、SPARK-25460 を参照してください。

  5. 2番目の SSH セッションに戻り、次の値を入力します。

    foo
    HiveSpark
    bar
    
  6. 最初の SSH セッションに戻り、この簡単なアクティビティに注目します。 次のコマンドを使用して、データを表示します。

    hive.table("stream_table").show()
    

2 番目の SSH セッションで、Ctrl + C を使用して netcat を停止します。 最初の SSH セッションで、:q を使用して spark-shell を終了します。

次のステップ