Hyperspace では、Apache Spark ユーザーがデータセット (CSV、JSON、Parquet など) にインデックスを作成し、クエリやワークロードの高速化を期待してそれらを使用できるようになりました。
この記事では、Hyperspace の基本操作を明確に示し、そのシンプルさに焦点を当て、ほぼ誰でもそれを使用できる方法について説明します。
免責事項:Hyperspace は、次の 2 つの状況下でワークロードまたはクエリを高速化するのに役立ちます。
- クエリに、選択度の高い述語に対するフィルターが含まれている。 100 万の候補行から一致する行を 100 行選択する場合などです。
- クエリに、負荷の大きいシャッフルを必要とする結合が含まれている。 100 GB のデータセットを 10 GB のデータセットに結合する場合などです。
ケースバイケースで、ワークロードを注意深く監視して、インデックスの作成が役に立っているかどうかを確認することをお勧めします。
このドキュメントは ノートブック形式 (Python 用、C# 用、Scala 用) でも使用できます。
セットアップ
Note
Hyperspace は、Azure Synapse Runtime for Apache Spark 3.1 (サポート対象外)、および Azure Synapse Runtime for Apache Spark 3.2 (サポート終了発表済み) でサポートされています。 ただし、Hyperspace は Azure Synapse Runtime for Apache Spark 3.3 (GA) ではサポートされていないことに留意してください。
まず、新しい Spark セッションを開始します。 このドキュメントは Hyperspace で提供できる機能を説明するだけのチュートリアルであるため、小さなデータセット上で Hyperspace が実行している処理を明確に説明できるように構成の変更を行います。
既定では、結合の一方の側のデータ サイズが小さい場合 (このチュートリアルで使用するサンプル データの場合)、Spark ではブロードキャスト結合を使用して結合のクエリを最適化します。 そのため、後で結合のクエリを実行するときに、Spark で並べ替えマージ結合が使用されるように、ブロードキャスト結合を無効にします。 これは主に、結合のクエリを高速化するために Hyperspace のインデックスを規模に応じて使用する方法を示すためのものです。
次のセルを実行した結果の出力には、正常に作成された Spark セッションへの参照が表示され、変更後の結合の構成の値として "-1" が出力されます。これは、ブロードキャスト結合が正常に無効になったことを示しています。
// Start your Spark session
spark
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Verify that BroadcastHashJoin is set correctly
println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
# Start your Spark session.
spark
# Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Verify that BroadcastHashJoin is set correctly
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.Conf().Set("spark.sql.autoBroadcastJoinThreshold", -1);
// Verify that BroadcastHashJoin is set correctly.
Console.WriteLine(spark.Conf().Get("spark.sql.autoBroadcastJoinThreshold"));
結果は次のようになります。
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@297e957d
-1
データの準備
環境を準備するには、サンプル データ レコードを作成して、Parquet データ ファイルとして保存します。 実例には Parquet が使用されていますが、CSV などの他の形式を使用することもできます。 以降のセルでは、このサンプル データセットに Hyperspace のインデックスをいくつか作成し、クエリの実行時に Spark でそれらを使用できるようにする方法を確認できます。
このサンプル レコードは、部門 (department) と従業員 (employee) という 2 つのデータセットに対応しています。 生成されたデータ ファイルを保存するには、ストレージ アカウントで目的の場所を指すように "emp_Location" と "dept_Location" の各パスを構成する必要があります。
次のセルを実行した結果の出力には、データセットの内容が 3 行 1 組のリストとして表示され、その後に各データセットの内容を適切な場所に保存するために作成されたデータフレームへの参照が表示されます。
import org.apache.spark.sql.DataFrame
// Sample department records
val departments = Seq(
(10, "Accounting", "New York"),
(20, "Research", "Dallas"),
(30, "Sales", "Chicago"),
(40, "Operations", "Boston"))
// Sample employee records
val employees = Seq(
(7369, "SMITH", 20),
(7499, "ALLEN", 30),
(7521, "WARD", 30),
(7566, "JONES", 20),
(7698, "BLAKE", 30),
(7782, "CLARK", 10),
(7788, "SCOTT", 20),
(7839, "KING", 10),
(7844, "TURNER", 30),
(7876, "ADAMS", 20),
(7900, "JAMES", 30),
(7934, "MILLER", 10),
(7902, "FORD", 20),
(7654, "MARTIN", 30))
// Save sample data in the Parquet format
import spark.implicits._
val empData: DataFrame = employees.toDF("empId", "empName", "deptId")
val deptData: DataFrame = departments.toDF("deptId", "deptName", "location")
val emp_Location: String = "/<yourpath>/employees.parquet" //TODO ** customize this location path **
val dept_Location: String = "/<yourpath>/departments.parquet" //TODO ** customize this location path **
empData.write.mode("overwrite").parquet(emp_Location)
deptData.write.mode("overwrite").parquet(dept_Location)
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
# Sample department records
departments = [(10, "Accounting", "New York"), (20, "Research", "Dallas"), (30, "Sales", "Chicago"), (40, "Operations", "Boston")]
# Sample employee records
employees = [(7369, "SMITH", 20), (7499, "ALLEN", 30), (7521, "WARD", 30), (7566, "JONES", 20), (7698, "BLAKE", 30)]
# Create a schema for the dataframe
dept_schema = StructType([StructField('deptId', IntegerType(), True), StructField('deptName', StringType(), True), StructField('location', StringType(), True)])
emp_schema = StructType([StructField('empId', IntegerType(), True), StructField('empName', StringType(), True), StructField('deptId', IntegerType(), True)])
departments_df = spark.createDataFrame(departments, dept_schema)
employees_df = spark.createDataFrame(employees, emp_schema)
#TODO ** customize this location path **
emp_Location = "/<yourpath>/employees.parquet"
dept_Location = "/<yourpath>/departments.parquet"
employees_df.write.mode("overwrite").parquet(emp_Location)
departments_df.write.mode("overwrite").parquet(dept_Location)
using Microsoft.Spark.Sql.Types;
// Sample department records
var departments = new List<GenericRow>()
{
new GenericRow(new object[] {10, "Accounting", "New York"}),
new GenericRow(new object[] {20, "Research", "Dallas"}),
new GenericRow(new object[] {30, "Sales", "Chicago"}),
new GenericRow(new object[] {40, "Operations", "Boston"})
};
// Sample employee records
var employees = new List<GenericRow>() {
new GenericRow(new object[] {7369, "SMITH", 20}),
new GenericRow(new object[] {7499, "ALLEN", 30}),
new GenericRow(new object[] {7521, "WARD", 30}),
new GenericRow(new object[] {7566, "JONES", 20}),
new GenericRow(new object[] {7698, "BLAKE", 30}),
new GenericRow(new object[] {7782, "CLARK", 10}),
new GenericRow(new object[] {7788, "SCOTT", 20}),
new GenericRow(new object[] {7839, "KING", 10}),
new GenericRow(new object[] {7844, "TURNER", 30}),
new GenericRow(new object[] {7876, "ADAMS", 20}),
new GenericRow(new object[] {7900, "JAMES", 30}),
new GenericRow(new object[] {7934, "MILLER", 10}),
new GenericRow(new object[] {7902, "FORD", 20}),
new GenericRow(new object[] {7654, "MARTIN", 30})
};
// Save sample data in the Parquet format
var departmentSchema = new StructType(new List<StructField>()
{
new StructField("deptId", new IntegerType()),
new StructField("deptName", new StringType()),
new StructField("location", new StringType())
});
var employeeSchema = new StructType(new List<StructField>()
{
new StructField("empId", new IntegerType()),
new StructField("empName", new StringType()),
new StructField("deptId", new IntegerType())
});
DataFrame empData = spark.CreateDataFrame(employees, employeeSchema);
DataFrame deptData = spark.CreateDataFrame(departments, departmentSchema);
string emp_Location = "/<yourpath>/employees.parquet"; //TODO ** customize this location path **
string dept_Location = "/<yourpath>/departments.parquet"; //TODO ** customize this location path **
empData.Write().Mode("overwrite").Parquet(emp_Location);
deptData.Write().Mode("overwrite").Parquet(dept_Location);
結果は次のようになります。
departments: Seq[(Int, String, String)] = List((10,Accounting,New York), (20,Research,Dallas), (30,Sales,Chicago), (40,Operations,Boston))
employees: Seq[(Int, String, Int)] = List((7369,SMITH,20), (7499,ALLEN,30), (7521,WARD,30), (7566,JONES,20), (7698,BLAKE,30), (7782,CLARK,10), (7788,SCOTT,20), (7839,KING,10), (7844,TURNER,30), (7876,ADAMS,20), (7900,JAMES,30), (7934,MILLER,10), (7902,FORD,20), (7654,MARTIN,30))
empData: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
emp_Location: String = /your-path/employees.parquet
dept_Location: String = /your-path/departments.parquet
作成した Parquet ファイルの内容を確認して、予想されるレコードが正しい形式で含まれているかどうか確かめましょう。 後で、これらのデータ ファイルを使用して Hyperspace のインデックスを作成し、サンプル クエリを実行します。
次のセルを実行すると、従業員および部門データフレームの行を表形式で表示する出力が生成されます。 14 人の従業員と 4 つの部門があり、それぞれが前のセルで作成した 3 行 1 組 のいずれかと一致しているはずです。
// emp_Location and dept_Location are the user defined locations above to save parquet files
val empDF: DataFrame = spark.read.parquet(emp_Location)
val deptDF: DataFrame = spark.read.parquet(dept_Location)
// Verify the data is available and correct
empDF.show()
deptDF.show()
# emp_Location and dept_Location are the user-defined locations above to save parquet files
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
# Verify the data is available and correct
emp_DF.show()
dept_DF.show()
// emp_Location and dept_Location are the user-defined locations above to save parquet files
DataFrame empDF = spark.Read().Parquet(emp_Location);
DataFrame deptDF = spark.Read().Parquet(dept_Location);
// Verify the data is available and correct
empDF.Show();
deptDF.Show();
結果は次のようになります。
empDF: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDF: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|EmpId|EmpName|DeptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
| 7900| JAMES| 30|
| 7934| MILLER| 10|
| 7839| KING| 10|
| 7566| JONES| 20|
| 7698| BLAKE| 30|
| 7782| CLARK| 10|
| 7788| SCOTT| 20|
| 7902| FORD| 20|
| 7654| MARTIN| 30|
|DeptId| DeptName|Location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
インデックス
Hyperspace では、永続的なデータ ファイルからスキャンされたレコードにインデックスを作成できます。 正常に作成されると、インデックスに対応するエントリが Hyperspace のメタデータに追加されます。 このメタデータは、後でクエリ処理中に適切なインデックスを検索して使用するために、(拡張機能を使用して) Apache Spark のオプティマイザーによって使用されます。
インデックスが作成された後、いくつかのアクションを実行できます。
- 基になるデータが変更された場合に最新の情報に更新する。 既存のインデックスを最新の情報に更新して変更をキャプチャできます。
- インデックスが必要ない場合に削除する。 論理的な削除を実行できます。つまり、インデックスは物理的には削除されませんが、"削除済み" とマークされるので、ワークロードで使用されなくなります。
- インデックスが不要になった場合にバキュームする。 インデックスをバキュームすることができます。これにより、インデックスの内容と関連するメタデータが Hyperspace のメタデータから物理的に完全に削除されます。
最新の情報に更新。基になるデータが変更された場合に、それをキャプチャできるように既存のインデックスを最新の情報に更新します。 削除。インデックスが必要ない場合に、論理的な削除を実行できます。つまり、インデックスは物理的には削除されませんが、"削除済み" とマークされるので、ワークロードで使用されなくなります。
以降のセクションでは、このようなインデックスの管理操作を Hyperspace で行う方法について説明します。
最初に、必要なライブラリをインポートし、Hyperspace のインスタンスを作成する必要があります。 後で、このインスタンスを使用してさまざまな Hyperspace API を呼び出し、サンプル データにインデックスを作成したり、それらのインデックスを変更したりします。
次のセルを実行した結果の出力には、作成された Hyperspace インスタンスへの参照が表示されます。
// Create an instance of Hyperspace
import com.microsoft.hyperspace._
val hyperspace: Hyperspace = Hyperspace()
from hyperspace import *
# Create an instance of Hyperspace
hyperspace = Hyperspace(spark)
// Create an instance of Hyperspace
using Microsoft.Spark.Extensions.Hyperspace;
Hyperspace hyperspace = new Hyperspace(spark);
結果は次のようになります。
hyperspace: com.microsoft.hyperspace.Hyperspace = com.microsoft.hyperspace.Hyperspace@1432f740
[インデックスを作成する]
Hyperspace のインデックスを作成するには、次の 2 つの情報を指定する必要があります。
- インデックスが作成されるデータを参照する Spark データフレーム。
- インデックス構成オブジェクト、IndexConfig。インデックスのインデックス名、インデックス付き列、および付加列を指定するものです。
まず、サンプル データに Hyperspace のインデックスを 3 つ ("deptIndex1" および "deptIndex2" という名前の部門データセットに 2 つのインデックス、"empIndex" という名前の従業員データセットに 1 つのインデックス) 作成します。 インデックスごとに、その名前と、インデックス付き列と付加列の列リストをキャプチャするための対応する IndexConfig が必要です。 次のセルを実行すると、これらの IndexConfig が作成され、出力にそれらが一覧表示されます。
Note
インデックス付き列は、フィルターまたは結合条件に表示される列です。 付加列は、ご自分の選択/プロジェクトに表示される列です。
たとえば、次のクエリについて考えてみましょう。
SELECT X
FROM T
WHERE Y = 2
Y にはインデックス付き列を指定でき、X には付加列を指定できます。
// Create index configurations
import com.microsoft.hyperspace.index.IndexConfig
val empIndexConfig: IndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
val deptIndexConfig1: IndexConfig = IndexConfig("deptIndex1", Seq("deptId"), Seq("deptName"))
val deptIndexConfig2: IndexConfig = IndexConfig("deptIndex2", Seq("location"), Seq("deptName"))
# Create index configurations
emp_IndexConfig = IndexConfig("empIndex1", ["deptId"], ["empName"])
dept_IndexConfig1 = IndexConfig("deptIndex1", ["deptId"], ["deptName"])
dept_IndexConfig2 = IndexConfig("deptIndex2", ["location"], ["deptName"])
using Microsoft.Spark.Extensions.Hyperspace.Index;
var empIndexConfig = new IndexConfig("empIndex", new string[] {"deptId"}, new string[] {"empName"});
var deptIndexConfig1 = new IndexConfig("deptIndex1", new string[] {"deptId"}, new string[] {"deptName"});
var deptIndexConfig2 = new IndexConfig("deptIndex2", new string[] {"location"}, new string[] {"deptName"});
結果は次のようになります。
empIndexConfig: com.microsoft.hyperspace.index.IndexConfig = [indexName: empIndex; indexedColumns: deptid; includedColumns: empname]
deptIndexConfig1: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex1; indexedColumns: deptid; includedColumns: deptname]
deptIndexConfig2: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex2; indexedColumns: location; includedColumns: deptname]
次に、インデックス構成を使用して 3 つのインデックスを作成します。 このために、Hyperspace のインスタンスに対して "createIndex" コマンドを呼び出します。 このコマンドでは、インデックス構成と、インデックスが作成される行を含むデータフレームが必要です。 次のセルを実行すると、3 つのインデックスが作成されます。
// Create indexes from configurations
import com.microsoft.hyperspace.index.Index
hyperspace.createIndex(empDF, empIndexConfig)
hyperspace.createIndex(deptDF, deptIndexConfig1)
hyperspace.createIndex(deptDF, deptIndexConfig2)
# Create indexes from configurations
hyperspace.createIndex(emp_DF, emp_IndexConfig)
hyperspace.createIndex(dept_DF, dept_IndexConfig1)
hyperspace.createIndex(dept_DF, dept_IndexConfig2)
// Create indexes from configurations
hyperspace.CreateIndex(empDF, empIndexConfig);
hyperspace.CreateIndex(deptDF, deptIndexConfig1);
hyperspace.CreateIndex(deptDF, deptIndexConfig2);
インデックスを一覧表示する
次のコードは、Hyperspace のインスタンスで使用可能なすべてのインデックスを一覧表示する方法を示しています。 ここでは、既存のインデックスに関する情報を Spark データフレームとして返す "indexes" API を使用するため、その他の操作を実行できます。
たとえば、このデータフレームに対して有効な操作を呼び出して、その内容を確認したり、それをさらに分析したり (特定のインデックスをフィルター処理したり、何らかの望ましいプロパティに従ってグループ化したりするなど) できます。
次のセルでは、データフレームの "show" アクションを使用してすべての行を出力し、インデックスの詳細を表形式で表示します。 インデックスごとに、Hyperspace によってメタデータに格納された、それに関するすべての情報を確認できます。 次のことがすぐにおわかりいただけます。
- config.indexName、config.indexedColumns、config.includedColumns、status.status は、ユーザーが通常参照するフィールドです。
- dfSignature は、Hyperspace によって自動的に生成され、インデックスごとに一意です。 Hyperspace では、このシグネチャを内部的に使用してインデックスを維持し、クエリ時にそれを利用します。
次の出力では、3 つのすべてのインデックスの状態が "ACTIVE" であり、その名前、インデックス付き列、および付加列が上記のインデックス構成で定義したものと一致しているはずです。
hyperspace.indexes.show
hyperspace.indexes().show()
hyperspace.Indexes().Show();
結果は次のようになります。
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
インデックスを削除する
既存のインデックスを削除するには、"deleteIndex" API を使用して、インデックス名を指定します。 インデックスの削除では論理的な削除が行われます。主に、Hyperspace のメタデータ内のインデックスの状態が "ACTIVE" から "DELETED" に更新されます。 これにより、削除されたインデックスが今後のすべてのクエリ最適化から除外され、Hyperspace ではどのクエリに対してもそのインデックスが選択されなくなります。
ただし、(論理的な削除であるため) 削除されたインデックスのインデックス ファイルは引き続き使用可能なままになっているため、ユーザーが望めば、そのインデックスを復元することが可能です。
次のセルでは、"deptIndex2" という名前のインデックスを削除し、その後に Hyperspace のメタデータを一覧表示します。 出力は上記の「インデックスを一覧表示する」のセルと似ていますが、"deptIndex2" の状態が "DELETED" に変更されているはずです。
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex2");
hyperspace.Indexes().Show();
結果は次のようになります。
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
インデックスを復元する
"restoreIndex" API を使用して、削除されたインデックスを復元できます。 これにより、最新バージョンのインデックスが ACTIVE 状態に戻り、クエリで再び使用できるようになります。 次のセルは、"restoreIndex" の使用例を示しています。 "deptIndex1" を削除してから復元します。 出力では、最初に "deleteIndex" コマンドの呼び出し後に "deptIndex1" が "DELETED" 状態になり、"restoreIndex" の呼び出し後に "ACTIVE" 状態に戻ったことを示しています。
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex1");
hyperspace.Indexes().Show();
hyperspace.RestoreIndex("deptIndex1");
hyperspace.Indexes().Show();
結果は次のようになります。
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.indexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
インデックスをバキュームする
vacuumIndex コマンドを使用して、物理的な削除 (削除されたインデックスのファイルとメタデータ エントリを完全に削除すること) を実行できます。 この操作は、元に戻すことはできません。 これにより、すべてのインデックス ファイルが物理的に削除されます (物理的な削除であるのはこのためです)。
次のセルは、"deptIndex2" インデックスをバキュームし、バキューム後の Hyperspace のメタデータを示しています。 どちらも "ACTIVE" 状態になっている "deptIndex1" と "empIndex" の 2 つのインデックスのメタデータ エントリはありますが、"deptIndex2" のエントリはないことがおわかりいただけるはずです。
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.VacuumIndex("deptIndex2");
hyperspace.Indexes().Show();
結果は次のようになります。
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Hyperspace を有効または無効にする
Hyperspace には、Spark を用いてインデックスの使用を有効または無効にする API が備わっています。
- enableHyperspace コマンドを使用すると、Hyperspace の最適化ルールが Spark のオプティマイザーから見えるようになり、Hyperspace の既存のインデックスを利用してユーザーのクエリが最適化されます。
- disableHyperspace コマンドを使用すると、クエリの最適化中に Hyperspace のルールが適用されなくなります。 Hyperspace を無効にしても、作成されたインデックスは元の状態のままなので、影響を受けません。
次のセルは、これらのコマンドを使用して、Hyperspace を有効または無効にする方法を示しています。 出力には、構成が更新された既存の Spark セッションへの参照が表示されます。
// Enable Hyperspace
spark.enableHyperspace
// Disable Hyperspace
spark.disableHyperspace
# Enable Hyperspace
Hyperspace.enable(spark)
# Disable Hyperspace
Hyperspace.disable(spark)
// Enable Hyperspace
spark.EnableHyperspace();
// Disable Hyperspace
spark.DisableHyperspace();
結果は次のようになります。
res48: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
res51: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
インデックスの使用量
クエリの処理中に Spark で Hyperspace のインデックスが使用されるようにするには、Hyperspace が有効になっていることを確認する必要があります。
次のセルでは、Hyperspace を有効にし、サンプル クエリの実行に使用する、サンプル データ レコードを含む 2 つのデータフレームを作成します。 データフレームごとに、いくつかのサンプル行が出力されます。
// Enable Hyperspace
spark.enableHyperspace
val empDFrame: DataFrame = spark.read.parquet(emp_Location)
val deptDFrame: DataFrame = spark.read.parquet(dept_Location)
empDFrame.show(5)
deptDFrame.show(5)
# Enable Hyperspace
Hyperspace.enable(spark)
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
emp_DF.show(5)
dept_DF.show(5)
// Enable Hyperspace
spark.EnableHyperspace();
DataFrame empDFrame = spark.Read().Parquet(emp_Location);
DataFrame deptDFrame = spark.Read().Parquet(dept_Location);
empDFrame.Show(5);
deptDFrame.Show(5);
結果は次のようになります。
res53: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.Spark™Session@39fe1ddb
empDFrame: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDFrame: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|empId|empName|deptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
上位 5 行のみが表示されます
|deptId| deptName|location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
インデックスの種類
Hyperspace には現在、次の 2 つのクエリ グループにインデックスを利用するためのルールがあります。
- 検索または範囲選択のフィルター述語を含む選択クエリ。
- 等価結合述語 (等結合) を含む結合クエリ。
フィルターを高速化するためのインデックス
クエリの最初の例では、次のセルに示されているように、部門レコードに対して検索を行っています。 SQL では、このクエリは次の例のようになります。
SELECT deptName
FROM departments
WHERE deptId = 20
次のセルを実行した結果の出力には、次のものが表示されます。
- クエリの結果 (1 つの部門名)。
- クエリを実行するために Spark で使用されたクエリ プラン。
クエリ プランでは、プランの下部にある FileScan 演算子に、レコードの読み取り元のデータソースが表示されます。 このファイルの場所は、"deptIndex1" インデックスの最新バージョンへのパスを示しています。 この情報は、クエリに従い、Hyperspace の最適化ルールを使用して、Spark で実行時に適切なインデックスが利用されることになったことを示しています。
// Filter with equality predicate
val eqFilter: DataFrame = deptDFrame.filter("deptId = 20").select("deptName")
eqFilter.show()
eqFilter.explain(true)
# Filter with equality predicate
eqFilter = dept_DF.filter("""deptId = 20""").select("""deptName""")
eqFilter.show()
eqFilter.explain(True)
DataFrame eqFilter = deptDFrame.Filter("deptId = 20").Select("deptName");
eqFilter.Show();
eqFilter.Explain(true);
結果は次のようになります。
eqFilter: org.apache.spark.sql.DataFrame = [deptName: string]
|DeptName|
|--------|
|Research|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), EqualTo(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
2 番目の例は、部門レコードに対する範囲選択クエリです。 SQL では、このクエリは次の例のようになります。
SELECT deptName
FROM departments
WHERE deptId > 20
最初の例と同様に、次のセルの出力には、クエリの結果 (2 つの部門の名前) とクエリ プランが表示されます。 FileScan 演算子に含まれるデータ ファイルの場所は、クエリの実行に "deptIndex1" が使用されたことを示しています。
// Filter with range selection predicate
val rangeFilter: DataFrame = deptDFrame.filter("deptId > 20").select("deptName")
rangeFilter.show()
rangeFilter.explain(true)
# Filter with range selection predicate
rangeFilter = dept_DF.filter("""deptId > 20""").select("deptName")
rangeFilter.show()
rangeFilter.explain(True)
// Filter with range selection predicate
DataFrame rangeFilter = deptDFrame.Filter("deptId > 20").Select("deptName");
rangeFilter.Show();
rangeFilter.Explain(true);
結果は次のようになります。
rangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|----------|
|Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
3 番目の例は、部門 ID で部門と従業員のレコードを結合するクエリです。 同等の SQL ステートメントを次に示します。
SELECT employees.deptId, empName, departments.deptId, deptName
FROM employees, departments
WHERE employees.deptId = departments.deptId
次のセルを実行した結果の出力には、クエリの結果 (14 人の従業員の名前と各従業員が働いている部門の名前) が表示されます。 出力にはクエリ プランも含まれています。 2 つの FileScan 演算子のファイルの場所で、Spark がクエリの実行に "empIndex" と "deptIndex1" のインデックスを使用したことがどのように示されているかに注意してください。
// Join
val eqJoin: DataFrame =
empDFrame.
join(deptDFrame, empDFrame("deptId") === deptDFrame("deptId")).
select(empDFrame("empName"), deptDFrame("deptName"))
eqJoin.show()
eqJoin.explain(true)
# Join
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
eqJoin.show()
eqJoin.explain(True)
// Join
DataFrame eqJoin =
empDFrame
.Join(deptDFrame, empDFrame.Col("deptId") == deptDFrame.Col("deptId"))
.Select(empDFrame.Col("empName"), deptDFrame.Col("deptName"));
eqJoin.Show();
eqJoin.Explain(true);
結果は次のようになります。
eqJoin: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534] parquet
== Physical Plan ==
*(3) Project [empName#528, deptName#534]
+- *(3) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>, SelectedBucketsCount: 200 out of 200
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>, SelectedBucketsCount: 200 out of 200
SQL セマンティクスのサポート
インデックスの使用は、データフレーム API と Spark SQL のどちらを使用しても透過的です。 次の例は、インデックスの使用を示す、前と同じ結合の例を SQL 形式で示したものです (該当する場合)。
empDFrame.createOrReplaceTempView("EMP")
deptDFrame.createOrReplaceTempView("DEPT")
val joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(true)
from pyspark.sql import SparkSession
emp_DF.createOrReplaceTempView("EMP")
dept_DF.createOrReplaceTempView("DEPT")
joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(True)
empDFrame.CreateOrReplaceTempView("EMP");
deptDFrame.CreateOrReplaceTempView("DEPT");
var joinQuery = spark.Sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId");
joinQuery.Show();
joinQuery.Explain(true);
結果は次のようになります。
joinQuery: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
'Project ['EMP.empName, 'DEPT.deptName]
+- 'Filter ('EMP.deptId = 'DEPT.deptId)
+- 'Join Inner
:- 'UnresolvedRelation `EMP`
+- 'UnresolvedRelation `DEPT`
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Filter (deptId#529 = deptId#533)
+- Join Inner
:- SubqueryAlias `emp`
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- SubqueryAlias `dept`
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(5) Project [empName#528, deptName#534]
+- *(5) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>
API を説明する
インデックスは非常に優れていますが、使用されているかどうかを確認するにはどうすればよいでしょうか。 Hyperspace では、ユーザーはクエリを実行する前に、元のプランと更新されたインデックス依存プランを比較できます。 コマンド出力を表示する場合は、HTML、プレーンテキスト、コンソール モードから選択できます。
次のセルは、HTML を使用した例を示しています。 強調表示されたセクションは、元のプランと更新されたプランの違いを、使用されているインデックスと共に表しています。
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin)(displayHTML(_))
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin, True, displayHTML)
spark.Conf().Set("spark.hyperspace.explain.displayMode", "html");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.beginTag", "<b style=\"background:LightGreen\">");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.endTag", "</b>");
hyperspace.Explain(eqJoin, false, input => DisplayHTML(input));
結果は次のようになります。
インデックスを使用したプラン
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
インデックスを使用しないプラン
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [empName#528,deptId#529] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/employees.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
使用されたインデックス
deptIndex1:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/deptIndex1/v__=0
empIndex:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/empIndex/v__=0
インデックスを最新の情報に更新する
インデックスの作成に使われた元のデータが変更された場合、そのインデックスではデータの最新の状態がキャプチャされなくなります。 refreshIndex コマンドを使用して、古くなったインデックスを最新の情報に更新できます。 このコマンドにより、インデックスが完全に再構築され、最新のデータ レコードに従って更新されます。 他のノートブックでインデックスを増分更新する方法について説明します。
次の 2 つのセルは、このシナリオの例を示しています。
- 最初のセルでは、元の部門データに部門を 2 つ追加します。 新しい部門が正しく追加されたことを確認するために、部門の一覧を読み取って出力します。 出力には、合計で 6 つの部門 (古い部門が 4 つと新しい部門が 2 つ) が表示されます。 refreshIndex を呼び出すと、"deptIndex1" が更新され、インデックスによって新しい部門がキャプチャされます。
- 2 番目のセルでは、範囲選択クエリの例を実行します。 今度は、結果に 4 つの部門 が含まれているはずです。そのうちの 2 つは前に上記のクエリを実行したときに表示された部門で、2 つは追加した新しい部門です。
特定のインデックスの更新
val extraDepartments = Seq(
(50, "Inovation", "Seattle"),
(60, "Human Resources", "San Francisco"))
val extraDeptData: DataFrame = extraDepartments.toDF("deptId", "deptName", "location")
extraDeptData.write.mode("Append").parquet(dept_Location)
val deptDFrameUpdated: DataFrame = spark.read.parquet(dept_Location)
deptDFrameUpdated.show(10)
hyperspace.refreshIndex("deptIndex1")
extra_Departments = [(50, "Inovation", "Seattle"), (60, "Human Resources", "San Francisco")]
extra_departments_df = spark.createDataFrame(extra_Departments, dept_schema)
extra_departments_df.write.mode("Append").parquet(dept_Location)
dept_DFrame_Updated = spark.read.parquet(dept_Location)
dept_DFrame_Updated.show(10)
var extraDepartments = new List<GenericRow>()
{
new GenericRow(new object[] {50, "Inovation", "Seattle"}),
new GenericRow(new object[] {60, "Human Resources", "San Francisco"})
};
DataFrame extraDeptData = spark.CreateDataFrame(extraDepartments, departmentSchema);
extraDeptData.Write().Mode("Append").Parquet(dept_Location);
DataFrame deptDFrameUpdated = spark.Read().Parquet(dept_Location);
deptDFrameUpdated.Show(10);
hyperspace.RefreshIndex("deptIndex1");
結果は次のようになります。
extraDepartments: Seq[(Int, String, String)] = List((50,Inovation,Seattle), (60,Human Resources,San Francisco))
extraDeptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
deptDFrameUpdated: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|deptId| deptName| location|
|------|---------------|-------------|
| 60|Human Resources|San Francisco|
| 10| Accounting| New York|
| 50| Inovation| Seattle|
| 40| Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
範囲選択
val newRangeFilter: DataFrame = deptDFrameUpdated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(true)
newRangeFilter = dept_DFrame_Updated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(True)
DataFrame newRangeFilter = deptDFrameUpdated.Filter("deptId > 20").Select("deptName");
newRangeFilter.Show();
newRangeFilter.Explain(true);
結果は次のようになります。
newRangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|---------------|
|Human Resources|
| Inovation|
| Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#675]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Optimized Logical Plan ==
Project [deptName#675]
+- Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- Relation[deptId#674,deptName#675,location#676] parquet
== Physical Plan ==
*(1) Project [deptName#675]
+- *(1) Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- *(1) FileScan parquet [deptId#674,deptName#675] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
変更可能なデータセットのハイブリッド スキャン
多くの場合、基になるソース データにいくつかの新しいファイルが追加されたり、既存のファイルが削除されたりした場合、インデックスは古くなり、Hyperspace では使用されなくなります。 ただし、インデックスを毎回更新することを必要とせずに、インデックスを使用したい場合があります。 これには複数の理由が考えられます。
- ワークロードが最適であると考えているため、インデックスを継続的に更新するのではなく、定期的に更新することが望ましい。
- いくつかのファイルを追加または削除しただけなので、他の更新ジョブが完了するまで待つことは望ましくない。
古くなったインデックスを引き続き使用できるようにするために、Hyperspace には、新しい手法であるハイブリッド スキャンが導入されています。これは、有効ではなくなったインデックスや古いインデックス (基となるソース データにいくつかの新しいファイルが追加されたか、既存のファイルが削除された場合など) をインデックスを更新することなく利用できるようにします。
これを実現するために、適切な構成を設定してハイブリッド スキャンを有効にすると、Hyperspace によってクエリ プランが変更され、次のようにその変更が活用されます。
- Union または BucketUnion (結合の場合) を使用して、追加されたファイルをインデックス データにマージできます。 必要に応じて、マージする前に、追加されたデータのシャッフルを適用することもできます。
- 削除されたファイルは、インデックス データのデータ系列列に NOT-IN をフィルター処理する条件を挿入することで処理できます。これにより、削除されたファイルのインデックス付けされた行をクエリ時に除外できます。
クエリ プランの変換は、次の例で確認できます。
Note
現時点では、ハイブリッド スキャンはパーティション分割されていないデータに対してのみサポートされています。
追加されたファイルのハイブリッド スキャン - パーティション分割されていないデータ
次の例では、パーティション分割されていないデータが使用されています。 この例では、クエリに結合インデックスを使用できること、および追加されたファイルに BucketUnion が使用されることを想定しています。
val testData = Seq(
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
).toDF("name", "qty", "date")
val testDataLocation = s"$dataPath/productTable"
testData.write.mode("overwrite").parquet(testDataLocation)
val testDF = spark.read.parquet(testDataLocation)
testdata = [
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
]
testdata_location = data_path + "/productTable"
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
testdata_schema = StructType([
StructField('name', StringType(), True),
StructField('qty', IntegerType(), True),
StructField('date', StringType(), True)])
test_df = spark.createDataFrame(testdata, testdata_schema)
test_df.write.mode("overwrite").parquet(testdata_location)
test_df = spark.read.parquet(testdata_location)
using Microsoft.Spark.Sql.Types;
var products = new List<GenericRow>() {
new GenericRow(new object[] {"orange", 3, "2020-10-01"}),
new GenericRow(new object[] {"banana", 1, "2020-10-01"}),
new GenericRow(new object[] {"carrot", 5, "2020-10-02"}),
new GenericRow(new object[] {"beetroot", 12, "2020-10-02"}),
new GenericRow(new object[] {"orange", 2, "2020-10-03"}),
new GenericRow(new object[] {"banana", 11, "2020-10-03"}),
new GenericRow(new object[] {"carrot", 3, "2020-10-03"}),
new GenericRow(new object[] {"beetroot", 2, "2020-10-04"}),
new GenericRow(new object[] {"cucumber", 7, "2020-10-05"}),
new GenericRow(new object[] {"pepper", 20, "2020-10-06"})
};
var productsSchema = new StructType(new List<StructField>()
{
new StructField("name", new StringType()),
new StructField("qty", new IntegerType()),
new StructField("date", new StringType())
});
DataFrame testData = spark.CreateDataFrame(products, productsSchema);
string testDataLocation = $"{dataPath}/productTable";
testData.Write().Mode("overwrite").Parquet(testDataLocation);
// CREATE INDEX
hyperspace.createIndex(testDF, IndexConfig("productIndex2", Seq("name"), Seq("date", "qty")))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val filter1 = testDF.filter("name = 'banana'")
val filter2 = testDF.filter("qty > 10")
val query = filter1.join(filter2, "name")
// Check Join index rule is applied properly.
hyperspace.explain(query)(displayHTML(_))
# CREATE INDEX
hyperspace.createIndex(test_df, IndexConfig("productIndex2", ["name"], ["date", "qty"]))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
filter1 = test_df.filter("name = 'banana'")
filter2 = test_df.filter("qty > 10")
query = filter1.join(filter2, "name")
# Check Join index rule is applied properly.
hyperspace.explain(query, True, displayHTML)
// CREATE INDEX
DataFrame testDF = spark.Read().Parquet(testDataLocation);
var productIndex2Config = new IndexConfig("productIndex", new string[] {"name"}, new string[] {"date", "qty"});
hyperspace.CreateIndex(testDF, productIndex2Config);
// Check Join index rule is applied properly.
DataFrame filter1 = testDF.Filter("name = 'banana'");
DataFrame filter2 = testDF.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,date#609,qty#608] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#631, qty#632, date#633]
+- *(2) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(2) FileScan parquet [name#631,date#633,qty#632] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(2) Sort [name#607 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#607, 200), [id=#453]
: +- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,qty#608,date#609] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#631 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#631, 200), [id=#459]
+- *(3) Project [name#631, qty#632, date#633]
+- *(3) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(3) FileScan parquet [name#631,qty#632,date#633] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
// Append new files.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
# Append new files.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
// Append new files.
var appendProducts = new List<GenericRow>()
{
new GenericRow(new object[] {"orange", 13, "2020-11-01"}),
new GenericRow(new object[] {"banana", 5, "2020-11-01"})
};
DataFrame appendData = spark.CreateDataFrame(appendProducts, productsSchema);
appendData.Write().Mode("Append").Parquet(testDataLocation);
ハイブリッド スキャンは、既定では無効になっています。 そのため、新しいデータを追加したので、Hyperspace ではインデックスが使用されないことを確認できます。
出力では、プランに違いはありません (そのため、強調表示されていません)。
// Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Hybrid Scan configs are false by default.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "false");
spark.Conf().Set("spark.hyperspace.index.hybridscan.delete.enabled", "false");
DataFrame testDFWithAppend = spark.Read().Parquet(testDataLocation);
DataFrame filter1 = testDFWithAppend.Filter("name = 'banana'");
DataFrame filter2 = testDFWithAppend.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#589]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#595]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#536]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#542]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
|banana| 1|2020-10-01| 11|2020-10-03|
+------+---+----------+---+----------
ハイブリッド スキャンを有効にする
インデックスを使用するプランでは、追加されたファイルに対してのみ Exchange ハッシュ パーティション分割が必要であることを確認できます。これにより、追加されたファイルの "シャッフルされた" インデックス データを引き続き利用できるようになります。 "シャッフルされた" 追加されたファイルをインデックス データにマージするために、BucketUnion が使用されています。
// Enable Hybrid Scan config. "delete" config is not necessary since we only appended data.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
spark.enableHyperspace
// Need to redefine query to recalculate the query plan.
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Enable Hybrid Scan config. "delete" config is not necessary.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
# Need to redefine query to recalculate the query plan.
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Enable Hybrid Scan config. "delete" config is not necessary.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "true");
spark.EnableHyperspace();
// Need to redefine query to recalculate the query plan.
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(3) Sort [name#678 ASC NULLS FIRST], false, 0
: +- BucketUnion 200 buckets, bucket columns: [name]
: :- *(1) Project [name#678, qty#679, date#680]
: : +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: : +- *(1) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
: +- Exchange hashpartitioning(name#678, 200), [id=#775]
: +- *(2) Project [name#678, qty#679, date#680]
: +- *(2) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(2) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(6) Sort [name#731 ASC NULLS FIRST], false, 0
+- BucketUnion 200 buckets, bucket columns: [name]
:- *(4) Project [name#731, qty#732, date#733]
: +- *(4) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
: +- *(4) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- Exchange hashpartitioning(name#731, 200), [id=#783]
+- *(5) Project [name#731, qty#732, date#733]
+- *(5) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(5) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#701]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#731 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#731, 200), [id=#707]
+- *(3) Project [name#731, qty#732, date#733]
+- *(3) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(3) FileScan parquet [name#731,qty#732,date#733] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
インデックスの増分更新
インデックスを更新する準備ができているが、インデックス全体を再構築したくない場合、Hyperspace では hs.refreshIndex("name", "incremental")
API を使用して増分形式でインデックスを更新することをサポートしています。 これにより、インデックスを最初から完全に再構築する必要がなくなり、以前に作成したインデックス ファイルを利用でき、新しく追加されたデータのインデックスのみを更新できます。
もちろん、パフォーマンスが低下しないように、相補的な optimizeIndex
API (以下に示す) を定期的に使用するようにしてください。 追加または削除したデータが元のデータセットの 10% 未満と想定すると、refreshIndex(..., "incremental")
を 10 回呼び出すたびに少なくとも 1 回最適化を呼び出すことをお勧めします。 たとえば、元のデータセットが 100 GB で、1 GB の増分/減分でデータを追加または削除した場合、optimizeIndex
を呼び出す前に refreshIndex
を 10 回呼び出すことができます。 この例は例示のためであり、ご使用のワークロードに合わせて調整する必要があることに注意してください。
次の例では、インデックスが使用された場合に、クエリ プランに Sort ノードが追加されていることに注意してください。 これは、追加されたデータ ファイルに部分的なインデックスが作成され、Spark によって Sort
が使用されたためです。 また、Shuffle
(つまり、Exchange) がプランから排除されたままであることにも注意してください。これにより、適切な高速化が得られます。
def query(): DataFrame = {
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
filter1.join(filter2, "name")
}
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query())(displayHTML(_))
query().show
def query():
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
return filter1.join(filter2, "name")
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query(), True, displayHTML)
query().show()
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(1) Sort [name#820 ASC NULLS FIRST], false, 0
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,date#822,qty#821] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Sort [name#826 ASC NULLS FIRST], false, 0
+- *(2) Project [name#826, qty#827, date#828]
+- *(2) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(2) FileScan parquet [name#826,date#828,qty#827] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(2) Sort [name#820 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#820, 200), [id=#927]
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,qty#821,date#822] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#826 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#826, 200), [id=#933]
+- *(3) Project [name#826, qty#827, date#828]
+- *(3) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(3) FileScan parquet [name#826,qty#827,date#828] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
インデックス レイアウトの最適化
新しく追加されたデータのために増分更新が複数回呼び出されると (ユーザーが小さなバッチでデータに書き込んだ場合やストリーミング シナリオの場合など)、インデックス ファイルの数がインデックスのパフォーマンスに大きく影響するようになる傾向があります (多数の小さなファイルの問題)。 インデックス レイアウトを最適化し、多数のファイルの問題を軽減するために、Hyperspace では hyperspace.optimizeIndex("indexName")
API が提供されています。
次のプランでは、Hyperspace によってクエリ プランで追加の Sort ノードが削除されていることに注意してください。 最適化は、1 つのファイルのみを含むインデックス バケットの並べ替えを回避するために役立ちます。 ただし、これは、optimizeIndex
の後に、すべてのインデックス バケットでバケットごとに最大 1 つのファイルがある場合にのみ当てはまります。
// Append some more data and call refresh again.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
hyperspace.refreshIndex("productIndex2", "incremental")
# Append some more data and call refresh again.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
hyperspace.refreshIndex("productIndex2", "incremental"
// Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query())(displayHTML(_))
# Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query(), True, displayHTML)
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,date#956,qty#955] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#960, qty#961, date#962]
+- *(2) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(2) FileScan parquet [name#960,date#962,qty#961] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(2) Sort [name#954 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#954, 200), [id=#1070]
: +- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,qty#955,date#956] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#960 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#960, 200), [id=#1076]
+- *(3) Project [name#960, qty#961, date#962]
+- *(3) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(3) FileScan parquet [name#960,qty#961,date#962] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=3
最適化モード
最適化の既定のモードは "クイック" モードで、定義済みのしきい値より小さいファイルが最適化の対象として選択されます。 最適化の効果を最大限に高めるために、Hyperspace では、次に示すように別の最適化モード "フル" にすることができます。 このモードでは、ファイルのサイズに関係なく、最適化のためにすべてのインデックス ファイルが選択され、最適なレイアウトのインデックスが作成されます。 ここではより多くのデータが処理されるため、これは既定の最適化モードよりも遅くなります。
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query())(displayHTML(_))
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query(), True, displayHTML)
結果:
=============================================================
Plan with indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,date#1002,qty#1001] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#1006, qty#1007, date#1008]
+- *(2) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(2) FileScan parquet [name#1006,date#1008,qty#1007] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(2) Sort [name#1000 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#1000, 200), [id=#1160]
: +- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,qty#1001,date#1002] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#1006 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#1006, 200), [id=#1166]
+- *(3) Project [name#1006, qty#1007, date#1008]
+- *(3) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(3) FileScan parquet [name#1006,qty#1007,date#1008] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=4