Hyperspace: An indexing subsystem for Apache Spark
Hyperspace introduces the ability for Apache Spark users to create indexes on their datasets, such as CSV, JSON, and Parquet, and use them for potential query and workload acceleration.
In this article, we highlight the basics of Hyperspace, emphasize its simplicity, and show how just about anyone can use it.
Disclaimer: Hyperspace helps accelerate your workloads or queries under two circumstances:
- Queries contain filters on predicates with high selectivity. For example, you might want to select 100 matching rows from a million candidate rows.
- Queries contain a join that requires heavy shuffles. For example, you might want to join a 100-GB dataset with a 10-GB dataset.
You might want to carefully monitor your workloads and determine whether indexing is helping you on a case-by-case basis.
This document is also available in notebook form, for Python, C#, and Scala.
Setup
Note
Hyperspace is supported in Azure Synapse Runtime for Apache Spark 3.1 (unsupported), and Azure Synapse Runtime for Apache Spark 3.2 (End of Support announced). However, it should be noted that Hyperspace is not supported in Azure Synapse Runtime for Apache Spark 3.3 (GA).
To begin with, start a new Spark session. Since this document is a tutorial merely to illustrate what Hyperspace can offer, you'll make a configuration change that allows us to highlight what Hyperspace is doing on small datasets.
By default, Spark uses broadcast join to optimize join queries when the data size for one side of join is small (which is the case for the sample data we use in this tutorial). Therefore, we disable broadcast joins so that later when we run join queries, Spark uses sort-merge join. This is mainly to show how Hyperspace indexes would be used at scale for accelerating join queries.
The output of running the following cell shows a reference to the successfully created Spark session and prints '-1' as the value for the modified join config, which indicates that broadcast join is successfully disabled.
// Start your Spark session
spark
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Verify that BroadcastHashJoin is set correctly
println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
# Start your Spark session.
spark
# Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Verify that BroadcastHashJoin is set correctly
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.Conf().Set("spark.sql.autoBroadcastJoinThreshold", -1);
// Verify that BroadcastHashJoin is set correctly.
Console.WriteLine(spark.Conf().Get("spark.sql.autoBroadcastJoinThreshold"));
Results in:
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@297e957d
-1
Data preparation
To prepare your environment, you'll create sample data records and save them as Parquet data files. Parquet is used for illustration, but you can also use other formats such as CSV. In the subsequent cells, you'll see how you can create several Hyperspace indexes on this sample dataset and make Spark use them when running queries.
The example records correspond to two datasets: department and employee. You should configure the "emp_Location" and "dept_Location" paths so that on the storage account they point to your desired location to save generated data files.
The output of running the following cell shows contents of our datasets as lists of triplets followed by references to dataFrames created to save the content of each dataset in our preferred location.
import org.apache.spark.sql.DataFrame
// Sample department records
val departments = Seq(
(10, "Accounting", "New York"),
(20, "Research", "Dallas"),
(30, "Sales", "Chicago"),
(40, "Operations", "Boston"))
// Sample employee records
val employees = Seq(
(7369, "SMITH", 20),
(7499, "ALLEN", 30),
(7521, "WARD", 30),
(7566, "JONES", 20),
(7698, "BLAKE", 30),
(7782, "CLARK", 10),
(7788, "SCOTT", 20),
(7839, "KING", 10),
(7844, "TURNER", 30),
(7876, "ADAMS", 20),
(7900, "JAMES", 30),
(7934, "MILLER", 10),
(7902, "FORD", 20),
(7654, "MARTIN", 30))
// Save sample data in the Parquet format
import spark.implicits._
val empData: DataFrame = employees.toDF("empId", "empName", "deptId")
val deptData: DataFrame = departments.toDF("deptId", "deptName", "location")
val emp_Location: String = "/<yourpath>/employees.parquet" //TODO ** customize this location path **
val dept_Location: String = "/<yourpath>/departments.parquet" //TODO ** customize this location path **
empData.write.mode("overwrite").parquet(emp_Location)
deptData.write.mode("overwrite").parquet(dept_Location)
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
# Sample department records
departments = [(10, "Accounting", "New York"), (20, "Research", "Dallas"), (30, "Sales", "Chicago"), (40, "Operations", "Boston")]
# Sample employee records
employees = [(7369, "SMITH", 20), (7499, "ALLEN", 30), (7521, "WARD", 30), (7566, "JONES", 20), (7698, "BLAKE", 30)]
# Create a schema for the dataframe
dept_schema = StructType([StructField('deptId', IntegerType(), True), StructField('deptName', StringType(), True), StructField('location', StringType(), True)])
emp_schema = StructType([StructField('empId', IntegerType(), True), StructField('empName', StringType(), True), StructField('deptId', IntegerType(), True)])
departments_df = spark.createDataFrame(departments, dept_schema)
employees_df = spark.createDataFrame(employees, emp_schema)
#TODO ** customize this location path **
emp_Location = "/<yourpath>/employees.parquet"
dept_Location = "/<yourpath>/departments.parquet"
employees_df.write.mode("overwrite").parquet(emp_Location)
departments_df.write.mode("overwrite").parquet(dept_Location)
using Microsoft.Spark.Sql.Types;
// Sample department records
var departments = new List<GenericRow>()
{
new GenericRow(new object[] {10, "Accounting", "New York"}),
new GenericRow(new object[] {20, "Research", "Dallas"}),
new GenericRow(new object[] {30, "Sales", "Chicago"}),
new GenericRow(new object[] {40, "Operations", "Boston"})
};
// Sample employee records
var employees = new List<GenericRow>() {
new GenericRow(new object[] {7369, "SMITH", 20}),
new GenericRow(new object[] {7499, "ALLEN", 30}),
new GenericRow(new object[] {7521, "WARD", 30}),
new GenericRow(new object[] {7566, "JONES", 20}),
new GenericRow(new object[] {7698, "BLAKE", 30}),
new GenericRow(new object[] {7782, "CLARK", 10}),
new GenericRow(new object[] {7788, "SCOTT", 20}),
new GenericRow(new object[] {7839, "KING", 10}),
new GenericRow(new object[] {7844, "TURNER", 30}),
new GenericRow(new object[] {7876, "ADAMS", 20}),
new GenericRow(new object[] {7900, "JAMES", 30}),
new GenericRow(new object[] {7934, "MILLER", 10}),
new GenericRow(new object[] {7902, "FORD", 20}),
new GenericRow(new object[] {7654, "MARTIN", 30})
};
// Save sample data in the Parquet format
var departmentSchema = new StructType(new List<StructField>()
{
new StructField("deptId", new IntegerType()),
new StructField("deptName", new StringType()),
new StructField("location", new StringType())
});
var employeeSchema = new StructType(new List<StructField>()
{
new StructField("empId", new IntegerType()),
new StructField("empName", new StringType()),
new StructField("deptId", new IntegerType())
});
DataFrame empData = spark.CreateDataFrame(employees, employeeSchema);
DataFrame deptData = spark.CreateDataFrame(departments, departmentSchema);
string emp_Location = "/<yourpath>/employees.parquet"; //TODO ** customize this location path **
string dept_Location = "/<yourpath>/departments.parquet"; //TODO ** customize this location path **
empData.Write().Mode("overwrite").Parquet(emp_Location);
deptData.Write().Mode("overwrite").Parquet(dept_Location);
Results in:
departments: Seq[(Int, String, String)] = List((10,Accounting,New York), (20,Research,Dallas), (30,Sales,Chicago), (40,Operations,Boston))
employees: Seq[(Int, String, Int)] = List((7369,SMITH,20), (7499,ALLEN,30), (7521,WARD,30), (7566,JONES,20), (7698,BLAKE,30), (7782,CLARK,10), (7788,SCOTT,20), (7839,KING,10), (7844,TURNER,30), (7876,ADAMS,20), (7900,JAMES,30), (7934,MILLER,10), (7902,FORD,20), (7654,MARTIN,30))
empData: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
emp_Location: String = /your-path/employees.parquet
dept_Location: String = /your-path/departments.parquet
Let's verify the contents of the Parquet files we created to make sure they contain expected records in the correct format. Later, we'll use these data files to create Hyperspace indexes and run sample queries.
Running the following cell produces an output that displays the rows in employee and department dataFrames in a tabular form. There should be 14 employees and 4 departments, each matching with one of triplets you created in the previous cell.
// emp_Location and dept_Location are the user defined locations above to save parquet files
val empDF: DataFrame = spark.read.parquet(emp_Location)
val deptDF: DataFrame = spark.read.parquet(dept_Location)
// Verify the data is available and correct
empDF.show()
deptDF.show()
# emp_Location and dept_Location are the user-defined locations above to save parquet files
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
# Verify the data is available and correct
emp_DF.show()
dept_DF.show()
// emp_Location and dept_Location are the user-defined locations above to save parquet files
DataFrame empDF = spark.Read().Parquet(emp_Location);
DataFrame deptDF = spark.Read().Parquet(dept_Location);
// Verify the data is available and correct
empDF.Show();
deptDF.Show();
Results in:
empDF: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDF: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|EmpId|EmpName|DeptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
| 7900| JAMES| 30|
| 7934| MILLER| 10|
| 7839| KING| 10|
| 7566| JONES| 20|
| 7698| BLAKE| 30|
| 7782| CLARK| 10|
| 7788| SCOTT| 20|
| 7902| FORD| 20|
| 7654| MARTIN| 30|
|DeptId| DeptName|Location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Indexes
Hyperspace lets you create indexes on records scanned from persisted data files. After they're successfully created, an entry that corresponds to the index is added to the Hyperspace's metadata. This metadata is later used by Apache Spark's optimizer (with our extensions) during query processing to find and use proper indexes.
After indexes are created, you can perform several actions:
- Refresh if the underlying data changes. You can refresh an existing index to capture the changes.
- Delete if the index isn't needed. You can perform a soft delete, that is, the index isn't physically deleted but is marked as "deleted" so that it's no longer used in your workloads.
- Vacuum if an index is no longer required. You can vacuum an index, which forces a physical deletion of the index contents and associated metadata completely from Hyperspace's metadata.
Refresh if the underlying data changes, you can refresh an existing index to capture that. Delete if the index isn't needed, you can perform a soft-delete that is, index isn't physically deleted but is marked as 'deleted' so it's no longer used in your workloads.
The following sections show how such index management operations can be done in Hyperspace.
First, you need to import the required libraries and create an instance of Hyperspace. Later, you'll use this instance to invoke different Hyperspace APIs to create indexes on your sample data and modify those indexes.
The output of running the following cell shows a reference to the created instance of Hyperspace.
// Create an instance of Hyperspace
import com.microsoft.hyperspace._
val hyperspace: Hyperspace = Hyperspace()
from hyperspace import *
# Create an instance of Hyperspace
hyperspace = Hyperspace(spark)
// Create an instance of Hyperspace
using Microsoft.Spark.Extensions.Hyperspace;
Hyperspace hyperspace = new Hyperspace(spark);
Results in:
hyperspace: com.microsoft.hyperspace.Hyperspace = com.microsoft.hyperspace.Hyperspace@1432f740
Create indexes
To create a Hyperspace index, you need to provide two pieces of information:
- A Spark DataFrame that references the data to be indexed.
- An index configuration object, IndexConfig, which specifies the index name and the indexed and included columns of the index.
You start by creating three Hyperspace indexes on our sample data: two indexes on the department dataset named "deptIndex1" and "deptIndex2" and one index on the employee dataset named "empIndex". For each index, you need a corresponding IndexConfig to capture the name along with columns lists for the indexed and included columns. Running the following cell creates these IndexConfigs, and its output lists them.
Note
An index column is a column that appears in your filters or join conditions. An included column is a column that appears in your select/project.
For instance, in the following query:
SELECT X
FROM T
WHERE Y = 2
Y can be an index column, and X can be an included column.
// Create index configurations
import com.microsoft.hyperspace.index.IndexConfig
val empIndexConfig: IndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
val deptIndexConfig1: IndexConfig = IndexConfig("deptIndex1", Seq("deptId"), Seq("deptName"))
val deptIndexConfig2: IndexConfig = IndexConfig("deptIndex2", Seq("location"), Seq("deptName"))
# Create index configurations
emp_IndexConfig = IndexConfig("empIndex1", ["deptId"], ["empName"])
dept_IndexConfig1 = IndexConfig("deptIndex1", ["deptId"], ["deptName"])
dept_IndexConfig2 = IndexConfig("deptIndex2", ["location"], ["deptName"])
using Microsoft.Spark.Extensions.Hyperspace.Index;
var empIndexConfig = new IndexConfig("empIndex", new string[] {"deptId"}, new string[] {"empName"});
var deptIndexConfig1 = new IndexConfig("deptIndex1", new string[] {"deptId"}, new string[] {"deptName"});
var deptIndexConfig2 = new IndexConfig("deptIndex2", new string[] {"location"}, new string[] {"deptName"});
Results in:
empIndexConfig: com.microsoft.hyperspace.index.IndexConfig = [indexName: empIndex; indexedColumns: deptid; includedColumns: empname]
deptIndexConfig1: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex1; indexedColumns: deptid; includedColumns: deptname]
deptIndexConfig2: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex2; indexedColumns: location; includedColumns: deptname]
Now, you create three indexes using your index configurations. For this purpose, you invoke "createIndex" command on our Hyperspace instance. This command requires an index configuration and the dataFrame containing rows to be indexed. Running the following cell creates three indexes.
// Create indexes from configurations
import com.microsoft.hyperspace.index.Index
hyperspace.createIndex(empDF, empIndexConfig)
hyperspace.createIndex(deptDF, deptIndexConfig1)
hyperspace.createIndex(deptDF, deptIndexConfig2)
# Create indexes from configurations
hyperspace.createIndex(emp_DF, emp_IndexConfig)
hyperspace.createIndex(dept_DF, dept_IndexConfig1)
hyperspace.createIndex(dept_DF, dept_IndexConfig2)
// Create indexes from configurations
hyperspace.CreateIndex(empDF, empIndexConfig);
hyperspace.CreateIndex(deptDF, deptIndexConfig1);
hyperspace.CreateIndex(deptDF, deptIndexConfig2);
List indexes
The code that follows shows how you can list all available indexes in a Hyperspace instance. It uses "indexes" API that returns information about existing indexes as a Spark DataFrame so you can perform more operations.
For instance, you can invoke valid operations on this DataFrame for checking its content or analyzing it further (for example filtering specific indexes or grouping them according to some desired property).
The following cell uses DataFrame's 'show' action to fully print the rows and show details of our indexes in a tabular form. For each index, you can see all information Hyperspace has stored about it in the metadata. You'll immediately notice the following:
- config.indexName, config.indexedColumns, config.includedColumns, and status.status are the fields that a user normally refers to.
- dfSignature is automatically generated by Hyperspace and is unique for each index. Hyperspace uses this signature internally to maintain the index and exploit it at query time.
In the following output, all three indexes should have "ACTIVE" as status and their name, indexed columns, and included columns should match with what we defined in index configurations above.
hyperspace.indexes.show
hyperspace.indexes().show()
hyperspace.Indexes().Show();
Results in:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Delete indexes
You can drop an existing index by using the "deleteIndex" API and providing the index name. Index deletion does a soft delete: It mainly updates index's status in the Hyperspace metadata from "ACTIVE" to "DELETED". This will exclude the dropped index from any future query optimization and Hyperspace no longer picks that index for any query.
However, index files for a deleted index still remain available (since it's a soft-delete), so that the index could be restored if user asks for.
The following cell deletes index with name "deptIndex2" and lists Hyperspace metadata after that. The output should be similar to above cell for "List Indexes" except for "deptIndex2", which now should have its status changed into "DELETED".
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex2");
hyperspace.Indexes().Show();
Results in:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Restore indexes
You can use the "restoreIndex" API to restore a deleted index. This will bring back the latest version of index into ACTIVE status and makes it usable again for queries. The following cell shows an example of "restoreIndex" usage. You delete "deptIndex1" and restore it. The output shows "deptIndex1" first went into the "DELETED" status after invoking "deleteIndex" command and came back to the "ACTIVE" status after calling "restoreIndex".
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex1");
hyperspace.Indexes().Show();
hyperspace.RestoreIndex("deptIndex1");
hyperspace.Indexes().Show();
Results in:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.indexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Vacuum indexes
You can perform a hard delete, that is, fully remove files and the metadata entry for a deleted index by using the vacuumIndex command. This action is irreversible. It physically deletes all the index files, which is why it's a hard delete.
The following cell vacuums the "deptIndex2" index and shows Hyperspace metadata after vacuuming. You should see metadata entries for two indexes "deptIndex1" and "empIndex" both with "ACTIVE" status and no entry for "deptIndex2".
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.VacuumIndex("deptIndex2");
hyperspace.Indexes().Show();
Results in:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Enable or disable Hyperspace
Hyperspace provides APIs to enable or disable index usage with Spark.
- By using the enableHyperspace command, Hyperspace optimization rules become visible to the Spark optimizer and exploit existing Hyperspace indexes to optimize user queries.
- By using the disableHyperspace command, Hyperspace rules no longer apply during query optimization. Disabling Hyperspace has no impact on created indexes because they remain intact.
The following cell shows how you can use these commands to enable or disable Hyperspace. The output shows a reference to the existing Spark session whose configuration is updated.
// Enable Hyperspace
spark.enableHyperspace
// Disable Hyperspace
spark.disableHyperspace
# Enable Hyperspace
Hyperspace.enable(spark)
# Disable Hyperspace
Hyperspace.disable(spark)
// Enable Hyperspace
spark.EnableHyperspace();
// Disable Hyperspace
spark.DisableHyperspace();
Results in:
res48: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
res51: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
Index usage
To make Spark use Hyperspace indexes during query processing, you need to make sure that Hyperspace is enabled.
The following cell enables Hyperspace and creates two DataFrames containing your sample data records, which you use for running example queries. For each DataFrame, a few sample rows are printed.
// Enable Hyperspace
spark.enableHyperspace
val empDFrame: DataFrame = spark.read.parquet(emp_Location)
val deptDFrame: DataFrame = spark.read.parquet(dept_Location)
empDFrame.show(5)
deptDFrame.show(5)
# Enable Hyperspace
Hyperspace.enable(spark)
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
emp_DF.show(5)
dept_DF.show(5)
// Enable Hyperspace
spark.EnableHyperspace();
DataFrame empDFrame = spark.Read().Parquet(emp_Location);
DataFrame deptDFrame = spark.Read().Parquet(dept_Location);
empDFrame.Show(5);
deptDFrame.Show(5);
Results in:
res53: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.Spark™Session@39fe1ddb
empDFrame: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDFrame: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|empId|empName|deptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
This only shows the top five rows
|deptId| deptName|location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Index types
Currently, Hyperspace has rules to exploit indexes for two groups of queries:
- Selection queries with lookup or range selection filtering predicates.
- Join queries with an equality join predicate (that is, equijoins).
Indexes for accelerating filters
The first example query does a lookup on department records, as shown in the following cell. In SQL, this query looks like the following example:
SELECT deptName
FROM departments
WHERE deptId = 20
The output of running the following cell shows:
- Query result, which is a single department name.
- Query plan that Spark used to run the query.
In the query plan, the FileScan operator at the bottom of the plan shows the data source where the records were read from. The location of this file indicates the path to the latest version of the "deptIndex1" index. This information shows that according to the query and using Hyperspace optimization rules, Spark decided to exploit the proper index at runtime.
// Filter with equality predicate
val eqFilter: DataFrame = deptDFrame.filter("deptId = 20").select("deptName")
eqFilter.show()
eqFilter.explain(true)
# Filter with equality predicate
eqFilter = dept_DF.filter("""deptId = 20""").select("""deptName""")
eqFilter.show()
eqFilter.explain(True)
DataFrame eqFilter = deptDFrame.Filter("deptId = 20").Select("deptName");
eqFilter.Show();
eqFilter.Explain(true);
Results in:
eqFilter: org.apache.spark.sql.DataFrame = [deptName: string]
|DeptName|
|--------|
|Research|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), EqualTo(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
The second example is a range selection query on department records. In SQL, this query looks like the following example:
SELECT deptName
FROM departments
WHERE deptId > 20
Similar to the first example, the output of the following cell shows the query results (names of two departments) and the query plan. The location of the data file in the FileScan operator shows that "deptIndex1" was used to run the query.
// Filter with range selection predicate
val rangeFilter: DataFrame = deptDFrame.filter("deptId > 20").select("deptName")
rangeFilter.show()
rangeFilter.explain(true)
# Filter with range selection predicate
rangeFilter = dept_DF.filter("""deptId > 20""").select("deptName")
rangeFilter.show()
rangeFilter.explain(True)
// Filter with range selection predicate
DataFrame rangeFilter = deptDFrame.Filter("deptId > 20").Select("deptName");
rangeFilter.Show();
rangeFilter.Explain(true);
Results in:
rangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|----------|
|Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
The third example is a query joining department and employee records on the department ID. The equivalent SQL statement is shown as follows:
SELECT employees.deptId, empName, departments.deptId, deptName
FROM employees, departments
WHERE employees.deptId = departments.deptId
The output of running the following cell shows the query results, which are the names of 14 employees and the name of the department each employee works in. The query plan is also included in the output. Notice how the file locations for two FileScan operators show that Spark used "empIndex" and "deptIndex1" indexes to run the query.
// Join
val eqJoin: DataFrame =
empDFrame.
join(deptDFrame, empDFrame("deptId") === deptDFrame("deptId")).
select(empDFrame("empName"), deptDFrame("deptName"))
eqJoin.show()
eqJoin.explain(true)
# Join
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
eqJoin.show()
eqJoin.explain(True)
// Join
DataFrame eqJoin =
empDFrame
.Join(deptDFrame, empDFrame.Col("deptId") == deptDFrame.Col("deptId"))
.Select(empDFrame.Col("empName"), deptDFrame.Col("deptName"));
eqJoin.Show();
eqJoin.Explain(true);
Results in:
eqJoin: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534] parquet
== Physical Plan ==
*(3) Project [empName#528, deptName#534]
+- *(3) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>, SelectedBucketsCount: 200 out of 200
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>, SelectedBucketsCount: 200 out of 200
Support for SQL semantics
The index usage is transparent to whether you use the DataFrame API or Spark SQL. The following example shows the same join example as before, in SQL form, showing the use of indexes if applicable.
empDFrame.createOrReplaceTempView("EMP")
deptDFrame.createOrReplaceTempView("DEPT")
val joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(true)
from pyspark.sql import SparkSession
emp_DF.createOrReplaceTempView("EMP")
dept_DF.createOrReplaceTempView("DEPT")
joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(True)
empDFrame.CreateOrReplaceTempView("EMP");
deptDFrame.CreateOrReplaceTempView("DEPT");
var joinQuery = spark.Sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId");
joinQuery.Show();
joinQuery.Explain(true);
Results in:
joinQuery: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
'Project ['EMP.empName, 'DEPT.deptName]
+- 'Filter ('EMP.deptId = 'DEPT.deptId)
+- 'Join Inner
:- 'UnresolvedRelation `EMP`
+- 'UnresolvedRelation `DEPT`
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Filter (deptId#529 = deptId#533)
+- Join Inner
:- SubqueryAlias `emp`
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- SubqueryAlias `dept`
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(5) Project [empName#528, deptName#534]
+- *(5) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>
Explain API
Indexes are great, but how do you know if they're being used? Hyperspace allows users to compare their original plan versus the updated index-dependent plan before running their query. You have an option to choose from HTML, plaintext, or console mode to display the command output.
The following cell shows an example with HTML. The highlighted section represents the difference between original and updated plans along with the indexes being used.
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin)(displayHTML(_))
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin, True, displayHTML)
spark.Conf().Set("spark.hyperspace.explain.displayMode", "html");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.beginTag", "<b style=\"background:LightGreen\">");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.endTag", "</b>");
hyperspace.Explain(eqJoin, false, input => DisplayHTML(input));
Results in:
Plan with indexes
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
Plan without indexes
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [empName#528,deptId#529] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/employees.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
Indexes used
deptIndex1:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/deptIndex1/v__=0
empIndex:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/empIndex/v__=0
Refresh indexes
If the original data on which an index was created changes, the index will no longer capture the latest state of data. You can refresh a stale index by using the refreshIndex command. This command causes the index to be fully rebuilt and updates it according to the latest data records. We'll show you how to incrementally refresh your index in other notebooks.
The following two cells show an example for this scenario:
- The first cell adds two more departments to the original departments data. It reads and prints a list of departments to verify new departments are added correctly. The output shows six departments in total: four old ones and two new. Invoking refreshIndex updates "deptIndex1" so that the index captures new departments.
- The second cell runs our range selection query example. The results should now contain four departments: two are the ones seen before when we ran the preceding query, and two are the new departments we added.
Specific index refresh
val extraDepartments = Seq(
(50, "Inovation", "Seattle"),
(60, "Human Resources", "San Francisco"))
val extraDeptData: DataFrame = extraDepartments.toDF("deptId", "deptName", "location")
extraDeptData.write.mode("Append").parquet(dept_Location)
val deptDFrameUpdated: DataFrame = spark.read.parquet(dept_Location)
deptDFrameUpdated.show(10)
hyperspace.refreshIndex("deptIndex1")
extra_Departments = [(50, "Inovation", "Seattle"), (60, "Human Resources", "San Francisco")]
extra_departments_df = spark.createDataFrame(extra_Departments, dept_schema)
extra_departments_df.write.mode("Append").parquet(dept_Location)
dept_DFrame_Updated = spark.read.parquet(dept_Location)
dept_DFrame_Updated.show(10)
var extraDepartments = new List<GenericRow>()
{
new GenericRow(new object[] {50, "Inovation", "Seattle"}),
new GenericRow(new object[] {60, "Human Resources", "San Francisco"})
};
DataFrame extraDeptData = spark.CreateDataFrame(extraDepartments, departmentSchema);
extraDeptData.Write().Mode("Append").Parquet(dept_Location);
DataFrame deptDFrameUpdated = spark.Read().Parquet(dept_Location);
deptDFrameUpdated.Show(10);
hyperspace.RefreshIndex("deptIndex1");
Results in:
extraDepartments: Seq[(Int, String, String)] = List((50,Inovation,Seattle), (60,Human Resources,San Francisco))
extraDeptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
deptDFrameUpdated: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|deptId| deptName| location|
|------|---------------|-------------|
| 60|Human Resources|San Francisco|
| 10| Accounting| New York|
| 50| Inovation| Seattle|
| 40| Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Range selection
val newRangeFilter: DataFrame = deptDFrameUpdated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(true)
newRangeFilter = dept_DFrame_Updated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(True)
DataFrame newRangeFilter = deptDFrameUpdated.Filter("deptId > 20").Select("deptName");
newRangeFilter.Show();
newRangeFilter.Explain(true);
Results in:
newRangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|---------------|
|Human Resources|
| Inovation|
| Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#675]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Optimized Logical Plan ==
Project [deptName#675]
+- Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- Relation[deptId#674,deptName#675,location#676] parquet
== Physical Plan ==
*(1) Project [deptName#675]
+- *(1) Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- *(1) FileScan parquet [deptId#674,deptName#675] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
Hybrid scan for mutable datasets
Often, if your underlying source data had some new files appended or existing files deleted, your index will get stale and Hyperspace decides not to use it. However, there are times when you just want to use the index without having to refresh it every time. There could be multiple reasons for doing so:
- You don't want to continuously refresh your index, but instead want to do it periodically because you understand your workloads the best.
- You added/removed only a few files and don't want to wait for yet another refresh job to finish.
To allow you to still use a stale index, Hyperspace introduces hybrid scan, a novel technique that allows users to utilize outdated or stale indexes (for example, the underlying source data had some new files appended or existing files deleted) without refreshing indexes.
To achieve this, when you set the appropriate configuration to enable hybrid scan, Hyperspace modifies the query plan to leverage the changes as following:
- Appended files can be merged to index data by using Union or BucketUnion (for join). Shuffling appended data can also be applied before merging, if needed.
- Deleted files can be handled by injecting Filter-NOT-IN condition on lineage column of index data, so that the indexed rows from the deleted files can be excluded at query time.
You can check the transformation of the query plan in following examples.
Note
Currently, hybrid scan is supported only for non-partitioned data.
Hybrid scan for appended files - non-partitioned data
Non-partitioned data is used in the following example. In this example, we expect that the Join index can be used for the query and BucketUnion is introduced for appended files.
val testData = Seq(
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
).toDF("name", "qty", "date")
val testDataLocation = s"$dataPath/productTable"
testData.write.mode("overwrite").parquet(testDataLocation)
val testDF = spark.read.parquet(testDataLocation)
testdata = [
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
]
testdata_location = data_path + "/productTable"
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
testdata_schema = StructType([
StructField('name', StringType(), True),
StructField('qty', IntegerType(), True),
StructField('date', StringType(), True)])
test_df = spark.createDataFrame(testdata, testdata_schema)
test_df.write.mode("overwrite").parquet(testdata_location)
test_df = spark.read.parquet(testdata_location)
using Microsoft.Spark.Sql.Types;
var products = new List<GenericRow>() {
new GenericRow(new object[] {"orange", 3, "2020-10-01"}),
new GenericRow(new object[] {"banana", 1, "2020-10-01"}),
new GenericRow(new object[] {"carrot", 5, "2020-10-02"}),
new GenericRow(new object[] {"beetroot", 12, "2020-10-02"}),
new GenericRow(new object[] {"orange", 2, "2020-10-03"}),
new GenericRow(new object[] {"banana", 11, "2020-10-03"}),
new GenericRow(new object[] {"carrot", 3, "2020-10-03"}),
new GenericRow(new object[] {"beetroot", 2, "2020-10-04"}),
new GenericRow(new object[] {"cucumber", 7, "2020-10-05"}),
new GenericRow(new object[] {"pepper", 20, "2020-10-06"})
};
var productsSchema = new StructType(new List<StructField>()
{
new StructField("name", new StringType()),
new StructField("qty", new IntegerType()),
new StructField("date", new StringType())
});
DataFrame testData = spark.CreateDataFrame(products, productsSchema);
string testDataLocation = $"{dataPath}/productTable";
testData.Write().Mode("overwrite").Parquet(testDataLocation);
// CREATE INDEX
hyperspace.createIndex(testDF, IndexConfig("productIndex2", Seq("name"), Seq("date", "qty")))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val filter1 = testDF.filter("name = 'banana'")
val filter2 = testDF.filter("qty > 10")
val query = filter1.join(filter2, "name")
// Check Join index rule is applied properly.
hyperspace.explain(query)(displayHTML(_))
# CREATE INDEX
hyperspace.createIndex(test_df, IndexConfig("productIndex2", ["name"], ["date", "qty"]))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
filter1 = test_df.filter("name = 'banana'")
filter2 = test_df.filter("qty > 10")
query = filter1.join(filter2, "name")
# Check Join index rule is applied properly.
hyperspace.explain(query, True, displayHTML)
// CREATE INDEX
DataFrame testDF = spark.Read().Parquet(testDataLocation);
var productIndex2Config = new IndexConfig("productIndex", new string[] {"name"}, new string[] {"date", "qty"});
hyperspace.CreateIndex(testDF, productIndex2Config);
// Check Join index rule is applied properly.
DataFrame filter1 = testDF.Filter("name = 'banana'");
DataFrame filter2 = testDF.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,date#609,qty#608] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#631, qty#632, date#633]
+- *(2) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(2) FileScan parquet [name#631,date#633,qty#632] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(2) Sort [name#607 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#607, 200), [id=#453]
: +- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,qty#608,date#609] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#631 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#631, 200), [id=#459]
+- *(3) Project [name#631, qty#632, date#633]
+- *(3) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(3) FileScan parquet [name#631,qty#632,date#633] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
// Append new files.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
# Append new files.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
// Append new files.
var appendProducts = new List<GenericRow>()
{
new GenericRow(new object[] {"orange", 13, "2020-11-01"}),
new GenericRow(new object[] {"banana", 5, "2020-11-01"})
};
DataFrame appendData = spark.CreateDataFrame(appendProducts, productsSchema);
appendData.Write().Mode("Append").Parquet(testDataLocation);
Hybrid scan is disabled by default. Therefore, you'll see that because we appended new data, Hyperspace will decide not to use the index.
In the output, you'll see no plan differences (hence, no highlighting).
// Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Hybrid Scan configs are false by default.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "false");
spark.Conf().Set("spark.hyperspace.index.hybridscan.delete.enabled", "false");
DataFrame testDFWithAppend = spark.Read().Parquet(testDataLocation);
DataFrame filter1 = testDFWithAppend.Filter("name = 'banana'");
DataFrame filter2 = testDFWithAppend.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#589]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#595]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#536]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#542]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
|banana| 1|2020-10-01| 11|2020-10-03|
+------+---+----------+---+----------
Enable hybrid scan
In plan with indexes, you can see Exchange hash partitioning required only for appended files so that we could still utilize the "shuffled" index data with appended files. BucketUnion is used to merge "shuffled" appended files with the index data.
// Enable Hybrid Scan config. "delete" config is not necessary since we only appended data.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
spark.enableHyperspace
// Need to redefine query to recalculate the query plan.
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Enable Hybrid Scan config. "delete" config is not necessary.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
# Need to redefine query to recalculate the query plan.
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Enable Hybrid Scan config. "delete" config is not necessary.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "true");
spark.EnableHyperspace();
// Need to redefine query to recalculate the query plan.
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(3) Sort [name#678 ASC NULLS FIRST], false, 0
: +- BucketUnion 200 buckets, bucket columns: [name]
: :- *(1) Project [name#678, qty#679, date#680]
: : +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: : +- *(1) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
: +- Exchange hashpartitioning(name#678, 200), [id=#775]
: +- *(2) Project [name#678, qty#679, date#680]
: +- *(2) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(2) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(6) Sort [name#731 ASC NULLS FIRST], false, 0
+- BucketUnion 200 buckets, bucket columns: [name]
:- *(4) Project [name#731, qty#732, date#733]
: +- *(4) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
: +- *(4) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- Exchange hashpartitioning(name#731, 200), [id=#783]
+- *(5) Project [name#731, qty#732, date#733]
+- *(5) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(5) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#701]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#731 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#731, 200), [id=#707]
+- *(3) Project [name#731, qty#732, date#733]
+- *(3) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(3) FileScan parquet [name#731,qty#732,date#733] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
Incremental index refresh
When you're ready to update your indexes but don't want to rebuild your entire index, Hyperspace supports updating indexes in an incremental manner using the hs.refreshIndex("name", "incremental")
API. This will eliminates the need for a full rebuild of index from scratch, utilizing previously created index files as well as updating indexes on only the newly added data.
Of course, be sure to use the complementary optimizeIndex
API (shown below) periodically to make sure you don't see performance regressions. We recommend calling optimize at least once for every 10 times you call refreshIndex(..., "incremental")
, assuming the data you added/removed is < 10% of the original dataset. For instance, if your original dataset is 100 GB, and you've added/removed data in increments/decrements of 1 GB, you can call refreshIndex
10 times before calling optimizeIndex
. Note that this example is for illustration and you have to adapt this for your workloads.
In the example below, notice the addition of a Sort node in the query plan when indexes are used. This is because partial indexes are created on the appended data files, causing Spark to introduce a Sort
. Also note that Shuffle
that is, Exchange is still eliminated from the plan, giving you the appropriate acceleration.
def query(): DataFrame = {
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
filter1.join(filter2, "name")
}
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query())(displayHTML(_))
query().show
def query():
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
return filter1.join(filter2, "name")
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query(), True, displayHTML)
query().show()
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(1) Sort [name#820 ASC NULLS FIRST], false, 0
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,date#822,qty#821] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Sort [name#826 ASC NULLS FIRST], false, 0
+- *(2) Project [name#826, qty#827, date#828]
+- *(2) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(2) FileScan parquet [name#826,date#828,qty#827] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(2) Sort [name#820 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#820, 200), [id=#927]
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,qty#821,date#822] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#826 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#826, 200), [id=#933]
+- *(3) Project [name#826, qty#827, date#828]
+- *(3) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(3) FileScan parquet [name#826,qty#827,date#828] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
Optimize index layout
After calling incremental refreshes multiple times on newly appended data (for example, if the user writes to data in small batches or in streaming scenarios), the number of index files tend to become large affecting the performance of the index (large number of small files problem). Hyperspace provides hyperspace.optimizeIndex("indexName")
API to optimize the index layout and reduce the large files problem.
In the plan below, notice that Hyperspace has removed the extra Sort node in the query plan. Optimize can help avoiding sorting for any index bucket that contains only one file. However, this will only be true if ALL the index buckets have at most one file per bucket, after optimizeIndex
.
// Append some more data and call refresh again.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
hyperspace.refreshIndex("productIndex2", "incremental")
# Append some more data and call refresh again.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
hyperspace.refreshIndex("productIndex2", "incremental"
// Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query())(displayHTML(_))
# Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query(), True, displayHTML)
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,date#956,qty#955] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#960, qty#961, date#962]
+- *(2) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(2) FileScan parquet [name#960,date#962,qty#961] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(2) Sort [name#954 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#954, 200), [id=#1070]
: +- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,qty#955,date#956] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#960 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#960, 200), [id=#1076]
+- *(3) Project [name#960, qty#961, date#962]
+- *(3) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(3) FileScan parquet [name#960,qty#961,date#962] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=3
Optimize modes
The default mode for optimization is "quick" mode where files smaller than a predefined threshold are picked for optimization. To maximize the effect of optimization, Hyperspace allows another optimize mode "full" as shown below. This mode picks ALL index files for optimization irrespective of their file size and creates the best possible layout of the index. This is also slower than the default optimize mode because more data is being processed here.
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query())(displayHTML(_))
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query(), True, displayHTML)
Result in:
=============================================================
Plan with indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,date#1002,qty#1001] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#1006, qty#1007, date#1008]
+- *(2) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(2) FileScan parquet [name#1006,date#1008,qty#1007] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(2) Sort [name#1000 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#1000, 200), [id=#1160]
: +- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,qty#1001,date#1002] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#1006 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#1006, 200), [id=#1166]
+- *(3) Project [name#1006, qty#1007, date#1008]
+- *(3) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(3) FileScan parquet [name#1006,qty#1007,date#1008] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=4