Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel werden Spark-basierte Vorgänge erläutert, die vom Hive Warehouse Connector (HWC) unterstützt werden. Alle gezeigten Beispiele werden über die Apache Spark-Shell ausgeführt.
Voraussetzungen
Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.
Erste Schritte
Führen Sie die folgenden Schritte aus, um eine spark-shell-Sitzung zu starten:
Verwenden Sie den Befehl ssh, um eine Verbindung mit Ihrem Apache Spark-Cluster herzustellen. Bearbeiten Sie den Befehl, indem Sie CLUSTERNAME durch den Namen Ihres Clusters ersetzen, und geben Sie den Befehl dann ein:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Führen Sie in Ihrer SSH-Sitzung den folgenden Befehl aus, und notieren Sie sich die Version von
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Bearbeiten Sie den Code, indem Sie die oben ermittelte Version für
hive-warehouse-connector-assembly
verwenden. Führen Sie anschließend den Befehl aus, um die Spark-Shell zu starten:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Nachdem die Spark-Shell gestartet wurde, kann mit den folgenden Befehlen eine Hive Warehouse Connector-Instanz gestartet werden:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Erstellen von Spark-Datenrahmen mithilfe von Hive-Abfragen
Die Ergebnisse aller Abfragen mit Verwendung der HWC-Bibliothek werden als Datenrahmen zurückgegeben. In den folgenden Beispielen wird veranschaulicht, wie Sie eine einfache Hive-Abfrage erstellen.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Bei den Ergebnissen der Abfrage handelt es sich um Spark-Datenrahmen, die mit Spark-Bibliotheken wie MLIB und SparkSQL verwendet werden können.
Schreiben von Spark-Datenrahmen in Hive-Tabellen
Spark bietet keine native Unterstützung für das Schreiben in verwaltete ACID-Tabellen von Hive. Mithilfe von HWC können Sie jedoch alle Datenrahmen in eine Hive-Tabelle schreiben. Diese Funktionalität wird im folgenden Beispiel veranschaulicht:
Erstellen Sie eine Tabelle mit dem Namen
sampletable_colorado
, und geben Sie Spalten dafür an, indem Sie den folgenden Befehl verwenden:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Filtern Sie die Tabelle
hivesampletable
, in der die Spaltestate
den EintragColorado
enthält. Diese Hive-Abfrage gibt einen Spark-Datenrahmen zurück, und das Ergebnis wird mit der Funktionwrite
in der Hive-Tabellesampletable_colorado
gespeichert.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Zeigen Sie die Ergebnisse mit dem folgenden Befehl an:
hive.table("sampletable_colorado").show()
Schreibvorgänge per strukturiertem Stream
Mit Hive Warehouse Connector können Sie das Spark-Streaming nutzen, um Daten in Hive-Tabellen zu schreiben.
Wichtig
Strukturierte Streamingschreibvorgänge werden in ESP-fähigen Spark 4.0-Clustern nicht unterstützt.
Mithilfe der Schritte können Sie Daten aus einem Spark-Stream vom Localhost-Port 9999 abrufen und als Hive-Tabelle formatieren. (über den Hive Warehouse Connector).
Starten Sie in der geöffneten Spark-Shell mit dem folgenden Befehl einen Spark-Stream:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Generieren Sie Daten für den von Ihnen erstellten Spark-Stream, indem Sie die folgenden Schritte ausführen:
- Richten Sie im selben Spark-Cluster eine zweite SSH-Sitzung ein.
- Geben Sie an der Eingabeaufforderung
nc -lk 9999
ein: Bei diesem Befehl wird das Hilfsprogrammnetcat
verwendet, um Daten über die Befehlszeile an den angegebenen Port zu senden.
Kehren Sie zur ersten SSH-Sitzung zurück, und erstellen Sie für die Streamingdaten eine neue Hive-Tabelle. Geben Sie in spark-shell den folgenden Befehl ein:
hive.createTable("stream_table").column("value","string").create()
Schreiben Sie anschließend die Streamingdaten mit dem folgenden Befehl in die neu erstellte Tabelle:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Wichtig
Aufgrund eines bekannten Problems in Apache Spark müssen die Optionen
metastoreUri
unddatabase
derzeit manuell festgelegt werden. Weitere Informationen zu diesem Problem finden Sie unter SPARK-25460.Kehren Sie zur zweiten SSH-Sitzung zurück, und geben Sie die folgenden Werte ein:
foo HiveSpark bar
Kehren Sie zur ersten SSH-Sitzung zurück, und beachten Sie die kurze Aktivität. Verwenden Sie zum Anzeigen der Daten den folgenden Befehl:
hive.table("stream_table").show()
Drücken Sie STRG+C, um netcat
in der zweiten SSH-Sitzung zu beenden. Verwenden Sie :q
, um spark-shell in der ersten SSH-Sitzung zu beenden.