Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Hyperspace introduce la possibilità per gli utenti di Apache Spark di creare indici nei set di dati, ad esempio CSV, JSON e Parquet e di usarli per potenziali accelerazioni di query e carico di lavoro.
In questo articolo vengono evidenziate le nozioni di base di Hyperspace, ne viene evidenziata la semplicità e viene illustrato come può essere usata da chiunque.
Dichiarazione di non responsabilità: Hyperspace consente di accelerare i carichi di lavoro o le query in due circostanze:
- Le query contengono filtri sui predicati con una selettività elevata. Ad esempio, è possibile selezionare 100 righe corrispondenti da un milione di righe candidate.
- Le query contengono un join che richiede riproduzioni con sequenze casuali pesanti. Ad esempio, è possibile unire un set di dati da 100 GB con un set di dati da 10 GB.
È consigliabile monitorare attentamente i carichi di lavoro e determinare se l'indicizzazione è utile caso per caso.
Questo documento è disponibile anche in formato notebook, per Python, C#e Scala.
Impostazione
Nota
Hyperspace è supportato nel runtime di Azure Synapse per Apache Spark 3.1 (non supportato) e nel runtime di Azure Synapse per Apache Spark 3.2 (fine del supporto annunciato). È tuttavia necessario notare che Hyperspace non è supportato nel runtime di Azure Synapse per Apache Spark 3.3 (GA).
Per iniziare, avviare una nuova sessione Spark. Poiché questo documento è un'esercitazione che mira semplicemente a illustrare ciò che Hyperspace può offrire, verrà apportata una modifica alla configurazione che consente di evidenziare le operazioni eseguite da Hyperspace su set di dati di piccole dimensioni.
Per impostazione predefinita, Spark usa il join di trasmissione per ottimizzare le query di join quando le dimensioni dei dati per un lato del join sono di piccole dimensioni (ovvero il caso per i dati di esempio usati in questa esercitazione). Di conseguenza, si disabilitano i join di trasmissione in modo che in un secondo momento quando si eseguono query di join, Spark usa un join di tipo ordina-unisci. Si tratta principalmente di mostrare in che modo gli indici Hyperspace vengono usati su larga scala per accelerare le query di join.
L'output dell'esecuzione della cella seguente mostra un riferimento alla sessione Spark creata correttamente e stampa '-1' come valore per la configurazione di join modificata, che indica che il join di trasmissione è stato disabilitato correttamente.
// Start your Spark session
spark
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Verify that BroadcastHashJoin is set correctly
println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
# Start your Spark session.
spark
# Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Verify that BroadcastHashJoin is set correctly
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
// Disable BroadcastHashJoin, so Spark will use standard SortMergeJoin. Currently, Hyperspace indexes utilize SortMergeJoin to speed up query.
spark.Conf().Set("spark.sql.autoBroadcastJoinThreshold", -1);
// Verify that BroadcastHashJoin is set correctly.
Console.WriteLine(spark.Conf().Get("spark.sql.autoBroadcastJoinThreshold"));
Il risultato è il seguente:
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@297e957d
-1
Preparazione dei dati
Per preparare l'ambiente, si creano record di dati di esempio e li si salva come file di dati Parquet. Parquet viene usato per l'illustrazione, ma è anche possibile usare altri formati, ad esempio CSV. Nelle celle successive si vedrà come creare diversi indici Hyperspace in questo set di dati di esempio e usarli in Spark durante l'esecuzione di query.
I record di esempio corrispondono a due set di dati: reparto e dipendente. È necessario configurare i percorsi "emp_Location" e "dept_Location" in modo che nell'account di archiviazione puntino al percorso di salvataggio dei file di dati generati desiderato.
L'output dell'esecuzione della cella seguente mostra il contenuto dei set di dati come elenchi di triplette seguiti da riferimenti ai DataFrame creati per salvare il contenuto di ogni set di dati nella posizione preferita.
import org.apache.spark.sql.DataFrame
// Sample department records
val departments = Seq(
(10, "Accounting", "New York"),
(20, "Research", "Dallas"),
(30, "Sales", "Chicago"),
(40, "Operations", "Boston"))
// Sample employee records
val employees = Seq(
(7369, "SMITH", 20),
(7499, "ALLEN", 30),
(7521, "WARD", 30),
(7566, "JONES", 20),
(7698, "BLAKE", 30),
(7782, "CLARK", 10),
(7788, "SCOTT", 20),
(7839, "KING", 10),
(7844, "TURNER", 30),
(7876, "ADAMS", 20),
(7900, "JAMES", 30),
(7934, "MILLER", 10),
(7902, "FORD", 20),
(7654, "MARTIN", 30))
// Save sample data in the Parquet format
import spark.implicits._
val empData: DataFrame = employees.toDF("empId", "empName", "deptId")
val deptData: DataFrame = departments.toDF("deptId", "deptName", "location")
val emp_Location: String = "/<yourpath>/employees.parquet" //TODO ** customize this location path **
val dept_Location: String = "/<yourpath>/departments.parquet" //TODO ** customize this location path **
empData.write.mode("overwrite").parquet(emp_Location)
deptData.write.mode("overwrite").parquet(dept_Location)
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
# Sample department records
departments = [(10, "Accounting", "New York"), (20, "Research", "Dallas"), (30, "Sales", "Chicago"), (40, "Operations", "Boston")]
# Sample employee records
employees = [(7369, "SMITH", 20), (7499, "ALLEN", 30), (7521, "WARD", 30), (7566, "JONES", 20), (7698, "BLAKE", 30)]
# Create a schema for the dataframe
dept_schema = StructType([StructField('deptId', IntegerType(), True), StructField('deptName', StringType(), True), StructField('location', StringType(), True)])
emp_schema = StructType([StructField('empId', IntegerType(), True), StructField('empName', StringType(), True), StructField('deptId', IntegerType(), True)])
departments_df = spark.createDataFrame(departments, dept_schema)
employees_df = spark.createDataFrame(employees, emp_schema)
#TODO ** customize this location path **
emp_Location = "/<yourpath>/employees.parquet"
dept_Location = "/<yourpath>/departments.parquet"
employees_df.write.mode("overwrite").parquet(emp_Location)
departments_df.write.mode("overwrite").parquet(dept_Location)
using Microsoft.Spark.Sql.Types;
// Sample department records
var departments = new List<GenericRow>()
{
new GenericRow(new object[] {10, "Accounting", "New York"}),
new GenericRow(new object[] {20, "Research", "Dallas"}),
new GenericRow(new object[] {30, "Sales", "Chicago"}),
new GenericRow(new object[] {40, "Operations", "Boston"})
};
// Sample employee records
var employees = new List<GenericRow>() {
new GenericRow(new object[] {7369, "SMITH", 20}),
new GenericRow(new object[] {7499, "ALLEN", 30}),
new GenericRow(new object[] {7521, "WARD", 30}),
new GenericRow(new object[] {7566, "JONES", 20}),
new GenericRow(new object[] {7698, "BLAKE", 30}),
new GenericRow(new object[] {7782, "CLARK", 10}),
new GenericRow(new object[] {7788, "SCOTT", 20}),
new GenericRow(new object[] {7839, "KING", 10}),
new GenericRow(new object[] {7844, "TURNER", 30}),
new GenericRow(new object[] {7876, "ADAMS", 20}),
new GenericRow(new object[] {7900, "JAMES", 30}),
new GenericRow(new object[] {7934, "MILLER", 10}),
new GenericRow(new object[] {7902, "FORD", 20}),
new GenericRow(new object[] {7654, "MARTIN", 30})
};
// Save sample data in the Parquet format
var departmentSchema = new StructType(new List<StructField>()
{
new StructField("deptId", new IntegerType()),
new StructField("deptName", new StringType()),
new StructField("location", new StringType())
});
var employeeSchema = new StructType(new List<StructField>()
{
new StructField("empId", new IntegerType()),
new StructField("empName", new StringType()),
new StructField("deptId", new IntegerType())
});
DataFrame empData = spark.CreateDataFrame(employees, employeeSchema);
DataFrame deptData = spark.CreateDataFrame(departments, departmentSchema);
string emp_Location = "/<yourpath>/employees.parquet"; //TODO ** customize this location path **
string dept_Location = "/<yourpath>/departments.parquet"; //TODO ** customize this location path **
empData.Write().Mode("overwrite").Parquet(emp_Location);
deptData.Write().Mode("overwrite").Parquet(dept_Location);
Il risultato è il seguente:
departments: Seq[(Int, String, String)] = List((10,Accounting,New York), (20,Research,Dallas), (30,Sales,Chicago), (40,Operations,Boston))
employees: Seq[(Int, String, Int)] = List((7369,SMITH,20), (7499,ALLEN,30), (7521,WARD,30), (7566,JONES,20), (7698,BLAKE,30), (7782,CLARK,10), (7788,SCOTT,20), (7839,KING,10), (7844,TURNER,30), (7876,ADAMS,20), (7900,JAMES,30), (7934,MILLER,10), (7902,FORD,20), (7654,MARTIN,30))
empData: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
emp_Location: String = /your-path/employees.parquet
dept_Location: String = /your-path/departments.parquet
Verificare il contenuto dei file Parquet creati per assicurarsi che contengano i record previsti nel formato corretto. Successivamente, questi file di dati verranno usati per creare indici Hyperspace ed eseguire query di esempio.
L'esecuzione della cella seguente genera un output che visualizza le righe nei DataFrame dei dipendenti e del reparto in un formato tabulare. Dovrebbero essere presenti 14 dipendenti e 4 reparti, ognuno dei quali corrisponde a una delle triplette create nella cella precedente.
// emp_Location and dept_Location are the user defined locations above to save parquet files
val empDF: DataFrame = spark.read.parquet(emp_Location)
val deptDF: DataFrame = spark.read.parquet(dept_Location)
// Verify the data is available and correct
empDF.show()
deptDF.show()
# emp_Location and dept_Location are the user-defined locations above to save parquet files
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
# Verify the data is available and correct
emp_DF.show()
dept_DF.show()
// emp_Location and dept_Location are the user-defined locations above to save parquet files
DataFrame empDF = spark.Read().Parquet(emp_Location);
DataFrame deptDF = spark.Read().Parquet(dept_Location);
// Verify the data is available and correct
empDF.Show();
deptDF.Show();
Il risultato è il seguente:
empDF: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDF: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|EmpId|EmpName|DeptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
| 7900| JAMES| 30|
| 7934| MILLER| 10|
| 7839| KING| 10|
| 7566| JONES| 20|
| 7698| BLAKE| 30|
| 7782| CLARK| 10|
| 7788| SCOTT| 20|
| 7902| FORD| 20|
| 7654| MARTIN| 30|
|DeptId| DeptName|Location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Indici
Hyperspace consente di creare indici sui record analizzati da file di dati persistenti. Dopo la creazione, viene aggiunta una voce corrispondente all'indice ai metadati di Hyperspace. Questi metadati vengono usati successivamente dall'ottimizzatore di Apache Spark (con le estensioni) durante l'elaborazione delle query per trovare e usare indici appropriati.
Dopo aver creato gli indici, è possibile eseguire diverse azioni:
- Aggiornare se i dati sottostanti cambiano. È possibile aggiornare un indice esistente per acquisire le modifiche.
- Eliminare se l'indice non è necessario. È possibile eseguire un'eliminazione temporanea, ovvero l'indice non viene eliminato fisicamente, ma è contrassegnato come "eliminato" in modo che non venga più usato nei carichi di lavoro.
- Eseguire un’azione vacuum se un indice non è più necessario. È possibile eseguire un’azione vacuum su un indice, che forza l'eliminazione fisica del contenuto dell'indice e i metadati associati completamente dai metadati di Hyperspace.
Aggiornare se i dati sottostanti cambiano, è possibile aggiornare un indice esistente per acquisirlo. Eliminare se l'indice non è necessario, è possibile eseguire un'eliminazione temporanea, ovvero l'indice non viene eliminato fisicamente, ma viene contrassegnato come "eliminato" in modo che non venga più usato nei carichi di lavoro.
Nelle sezioni seguenti viene illustrato come eseguire tali operazioni di gestione degli indici in Hyperspace.
Prima di tutto, è necessario importare le librerie necessarie e creare un'istanza di Hyperspace. Successivamente, si userà questa istanza per richiamare api Hyperspace diverse per creare indici sui dati di esempio e modificare tali indici.
L'output dell'esecuzione della cella seguente mostra un riferimento all'istanza creata di Hyperspace.
// Create an instance of Hyperspace
import com.microsoft.hyperspace._
val hyperspace: Hyperspace = Hyperspace()
from hyperspace import *
# Create an instance of Hyperspace
hyperspace = Hyperspace(spark)
// Create an instance of Hyperspace
using Microsoft.Spark.Extensions.Hyperspace;
Hyperspace hyperspace = new Hyperspace(spark);
Il risultato è il seguente:
hyperspace: com.microsoft.hyperspace.Hyperspace = com.microsoft.hyperspace.Hyperspace@1432f740
Creare indici
Per creare un indice Hyperspace, è necessario fornire due informazioni:
- DataFrame Spark che fa riferimento ai dati da indicizzare.
- Oggetto di configurazione dell'indice IndexConfig, che specifica il nome dell'indice e le colonne indicizzate e incluse dell'indice.
Per iniziare, creare tre indici Hyperspace sui dati di esempio: due indici nel set di dati del reparto denominato "deptIndex1" e "deptIndex2" e un indice nel set di dati dei dipendenti denominato "empIndex". Per ogni indice, è necessario un IndexConfig corrispondente per acquisire il nome insieme agli elenchi di colonne per le colonne indicizzate e incluse. L'esecuzione della cella seguente crea questi IndexConfig e ne elenca gli output.
Nota
Una colonna di indice è una colonna visualizzata nei filtri o nelle condizioni di join. Una colonna inclusa è una colonna visualizzata nella selezione/progetto.
Ad esempio, nella query seguente:
SELECT X
FROM T
WHERE Y = 2
Y può essere una colonna di indice e X una colonna inclusa.
// Create index configurations
import com.microsoft.hyperspace.index.IndexConfig
val empIndexConfig: IndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
val deptIndexConfig1: IndexConfig = IndexConfig("deptIndex1", Seq("deptId"), Seq("deptName"))
val deptIndexConfig2: IndexConfig = IndexConfig("deptIndex2", Seq("location"), Seq("deptName"))
# Create index configurations
emp_IndexConfig = IndexConfig("empIndex1", ["deptId"], ["empName"])
dept_IndexConfig1 = IndexConfig("deptIndex1", ["deptId"], ["deptName"])
dept_IndexConfig2 = IndexConfig("deptIndex2", ["location"], ["deptName"])
using Microsoft.Spark.Extensions.Hyperspace.Index;
var empIndexConfig = new IndexConfig("empIndex", new string[] {"deptId"}, new string[] {"empName"});
var deptIndexConfig1 = new IndexConfig("deptIndex1", new string[] {"deptId"}, new string[] {"deptName"});
var deptIndexConfig2 = new IndexConfig("deptIndex2", new string[] {"location"}, new string[] {"deptName"});
Il risultato è il seguente:
empIndexConfig: com.microsoft.hyperspace.index.IndexConfig = [indexName: empIndex; indexedColumns: deptid; includedColumns: empname]
deptIndexConfig1: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex1; indexedColumns: deptid; includedColumns: deptname]
deptIndexConfig2: com.microsoft.hyperspace.index.IndexConfig = [indexName: deptIndex2; indexedColumns: location; includedColumns: deptname]
A questo punto si creano tre indici usando le configurazioni dell'indice. A tale scopo, richiamare il comando "createIndex" nell'istanza di Hyperspace. Questo comando richiede una configurazione dell'indice e il DataFrame contenente righe da indicizzare. L'esecuzione della cella seguente crea tre indici.
// Create indexes from configurations
import com.microsoft.hyperspace.index.Index
hyperspace.createIndex(empDF, empIndexConfig)
hyperspace.createIndex(deptDF, deptIndexConfig1)
hyperspace.createIndex(deptDF, deptIndexConfig2)
# Create indexes from configurations
hyperspace.createIndex(emp_DF, emp_IndexConfig)
hyperspace.createIndex(dept_DF, dept_IndexConfig1)
hyperspace.createIndex(dept_DF, dept_IndexConfig2)
// Create indexes from configurations
hyperspace.CreateIndex(empDF, empIndexConfig);
hyperspace.CreateIndex(deptDF, deptIndexConfig1);
hyperspace.CreateIndex(deptDF, deptIndexConfig2);
Elencare gli indici
Il codice seguente mostra come elencare tutti gli indici disponibili in un'istanza di Hyperspace. Usa l'API "indici" che restituisce informazioni sugli indici esistenti come DataFrame Spark, in modo che possano essere eseguite più operazioni.
Ad esempio, è possibile richiamare operazioni valide su questo DataFrame per controllarne il contenuto o analizzarlo ulteriormente, ad esempio filtrando indici specifici o raggruppandoli in base a una proprietà desiderata.
Nella cella seguente viene utilizzata l'azione 'show' del DataFrame per stampare completamente le righe e visualizzare i dettagli degli indici in un formato tabulare. Per ogni indice, è possibile visualizzare tutte le informazioni archiviate da Hyperspace nei metadati. Si noterà immediatamente quanto segue:
- config.indexName, config.indexedColumns, config.includedColumns e status.status sono i campi a cui normalmente fa riferimento un utente.
- dfSignature viene generato automaticamente da Hyperspace ed è univoco per ogni indice. Hyperspace usa internamente questa firma per mantenere l'indice e sfruttarlo in fase di query.
Nell'output seguente, tutti e tre gli indici devono avere "ACTIVE" come stato e il nome, le colonne indicizzate e le colonne incluse devono corrispondere a quanto definito nelle configurazioni dell'indice precedente.
hyperspace.indexes.show
hyperspace.indexes().show()
hyperspace.Indexes().Show();
Il risultato è il seguente:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Eliminare gli indici
È possibile eliminare un indice esistente usando l'API "deleteIndex" e specificando il nome dell'indice. L'eliminazione dell'indice esegue un'eliminazione temporanea: aggiorna principalmente lo stato dell'indice nei metadati Hyperspace da "ACTIVE" a "DELETED". Questo escluderà l'indice eliminato da qualsiasi ottimizzazione delle query futura e Hyperspace non seleziona più l'indice per qualsiasi query.
Tuttavia, i file di indice per un indice eliminato rimangono ancora disponibili (poiché si tratta di un'eliminazione temporanea), in modo che l'indice possa essere ripristinato se l'utente lo richiede.
La cella seguente elimina l'indice con il nome "deptIndex2" ed elenca i metadati Hyperspace dopo di esso. L'output dovrebbe essere simile alla cella precedente per "List Indexes" ad eccezione di "deptIndex2", che ora dovrebbe avere lo stato modificato in "DELETED".
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex2");
hyperspace.Indexes().Show();
Il risultato è il seguente:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Ripristinare gli indici
È possibile usare l'API "restoreIndex" per ripristinare un indice eliminato. In questo modo verrà restituita la versione più recente dell'indice in stato ACTIVE e sarà nuovamente utilizzabile per le query. La cella seguente mostra un esempio di utilizzo di "restoreIndex". Eliminare "deptIndex1" e ripristinarlo. L'output mostra che "deptIndex1" è passato prima allo stato "DELETED" dopo aver richiamato il comando "deleteIndex" ed è tornato allo stato "ACTIVE" dopo aver chiamato "restoreIndex".
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes.show
hyperspace.deleteIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.restoreIndex("deptIndex1")
hyperspace.indexes().show()
hyperspace.DeleteIndex("deptIndex1");
hyperspace.Indexes().Show();
hyperspace.RestoreIndex("deptIndex1");
hyperspace.Indexes().Show();
Il risultato è il seguente:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.indexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| deptIndex2| [location]| [deptName]|`location` STRING...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| DELETED| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Eseguire un’operazione vacuum sugli indici
È possibile eseguire un'eliminazione rigida, ovvero rimuovere completamente i file e la voce di metadati per un indice eliminato usando il comando vacuumIndex. Questa azione è irreversibile. Elimina fisicamente tutti i file di indice, motivo per cui si tratta di un'eliminazione complessa.
La cella seguente esegue il vuoto dell'indice "deptIndex2" e mostra i metadati Hyperspace dopo il vuoto. Verranno visualizzate voci di metadati per due indici "deptIndex1" e "empIndex" con stato "ACTIVE" e nessuna voce per "deptIndex2".
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes.show
hyperspace.vacuumIndex("deptIndex2")
hyperspace.indexes().show()
hyperspace.VacuumIndex("deptIndex2");
hyperspace.Indexes().Show();
Il risultato è il seguente:
|Config.IndexName|Config.IndexedColumns|Config.IncludedColumns| SchemaString| SignatureProvider| DfSignature| SerializedPlan|NumBuckets| DirPath|Status.Value|Stats.IndexSize|
|----------------|---------------------|----------------------|--------------------|--------------------|--------------------|--------------------|----------|--------------------|------------|---------------|
| deptIndex1| [deptId]| [deptName]|`deptId` INT,`dep...|com.microsoft.cha...|0effc1610ae2e7c49...|Relation[deptId#3...| 200|abfss://datasets@...| ACTIVE| 0|
| empIndex| [deptId]| [empName]|`deptId` INT,`emp...|com.microsoft.cha...|30768c6c9b2533004...|Relation[empId#32...| 200|abfss://datasets@...| ACTIVE| 0|
Abilitare o disabilitare Hyperspace
Hyperspace fornisce API per abilitare o disabilitare l'utilizzo degli indici con Spark.
- Usando il comando enableHyperspace, le regole di ottimizzazione hyperspace diventano visibili all'utilità di ottimizzazione Spark e sfruttano gli indici Hyperspace esistenti per ottimizzare le query utente.
- Usando il comando disableHyperspace, le regole Hyperspace non vengono più applicate durante l'ottimizzazione delle query. La disabilitazione di Hyperspace non ha alcun impatto sugli indici creati perché i suddetti rimangono intatti.
La cella seguente illustra come usare questi comandi per abilitare o disabilitare Hyperspace. L'output mostra un riferimento alla sessione Spark esistente la cui configurazione viene aggiornata.
// Enable Hyperspace
spark.enableHyperspace
// Disable Hyperspace
spark.disableHyperspace
# Enable Hyperspace
Hyperspace.enable(spark)
# Disable Hyperspace
Hyperspace.disable(spark)
// Enable Hyperspace
spark.EnableHyperspace();
// Disable Hyperspace
spark.DisableHyperspace();
Il risultato è il seguente:
res48: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
res51: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.SparkSession@39fe1ddb
Utilizzo dell'indice
Per fare in modo che Spark usi gli indici Hyperspace durante l'elaborazione delle query, è necessario assicurarsi che Hyperspace sia abilitato.
La cella seguente abilita Hyperspace e crea due DataFrame contenenti i record di dati di esempio, che vengono usati per l'esecuzione di query di esempio. Per ogni DataFrame vengono stampate alcune righe di esempio.
// Enable Hyperspace
spark.enableHyperspace
val empDFrame: DataFrame = spark.read.parquet(emp_Location)
val deptDFrame: DataFrame = spark.read.parquet(dept_Location)
empDFrame.show(5)
deptDFrame.show(5)
# Enable Hyperspace
Hyperspace.enable(spark)
emp_DF = spark.read.parquet(emp_Location)
dept_DF = spark.read.parquet(dept_Location)
emp_DF.show(5)
dept_DF.show(5)
// Enable Hyperspace
spark.EnableHyperspace();
DataFrame empDFrame = spark.Read().Parquet(emp_Location);
DataFrame deptDFrame = spark.Read().Parquet(dept_Location);
empDFrame.Show(5);
deptDFrame.Show(5);
Il risultato è il seguente:
res53: org.apache.spark.sql.Spark™Session = org.apache.spark.sql.Spark™Session@39fe1ddb
empDFrame: org.apache.spark.sql.DataFrame = [empId: int, empName: string ... 1 more field]
deptDFrame: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|empId|empName|deptId|
|-----|-------|------|
| 7499| ALLEN| 30|
| 7521| WARD| 30|
| 7369| SMITH| 20|
| 7844| TURNER| 30|
| 7876| ADAMS| 20|
Vengono visualizzate solo le prime cinque righe
|deptId| deptName|location|
|------|----------|--------|
| 10|Accounting|New York|
| 40|Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Tipi di indice
Attualmente, Hyperspace dispone di regole per sfruttare gli indici per due gruppi di query:
- Query di selezione con predicati di filtro di ricerca o selezione di intervalli.
- Unire le query con un predicato di join di uguaglianza, ovvero equijoins.
Indici per l'accelerazione dei filtri
La prima query di esempio esegue una ricerca sui record dei reparti, come illustrato nella cella seguente. In SQL, la query è simile all'esempio seguente:
SELECT deptName
FROM departments
WHERE deptId = 20
L'output dell'esecuzione della cella seguente mostra:
- Risultato della query, ovvero un singolo nome di reparto.
- Piano di query usato da Spark per eseguire la query.
Nel piano di query l'operatore FileScan nella parte inferiore del piano mostra l'origine dati da cui sono stati letti i record. Il percorso di questo file indica il percorso dell'ultima versione dell'indice "deptIndex1". Queste informazioni mostrano che, in base alla query e usando le regole di ottimizzazione Hyperspace, Spark ha deciso di sfruttare l'indice appropriato in fase di esecuzione.
// Filter with equality predicate
val eqFilter: DataFrame = deptDFrame.filter("deptId = 20").select("deptName")
eqFilter.show()
eqFilter.explain(true)
# Filter with equality predicate
eqFilter = dept_DF.filter("""deptId = 20""").select("""deptName""")
eqFilter.show()
eqFilter.explain(True)
DataFrame eqFilter = deptDFrame.Filter("deptId = 20").Select("deptName");
eqFilter.Show();
eqFilter.Explain(true);
Il risultato è il seguente:
eqFilter: org.apache.spark.sql.DataFrame = [deptName: string]
|DeptName|
|--------|
|Research|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 = 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 = 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), EqualTo(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
Il secondo esempio è una query di selezione dell'intervallo sui record dei reparti. In SQL, la query è simile all'esempio seguente:
SELECT deptName
FROM departments
WHERE deptId > 20
Analogamente al primo esempio, l'output della cella seguente mostra i risultati della query (nomi di due reparti) e il piano di query. Il percorso del file di dati nell'operatore FileScan indica che "deptIndex1" è stato usato per eseguire la query.
// Filter with range selection predicate
val rangeFilter: DataFrame = deptDFrame.filter("deptId > 20").select("deptName")
rangeFilter.show()
rangeFilter.explain(true)
# Filter with range selection predicate
rangeFilter = dept_DF.filter("""deptId > 20""").select("deptName")
rangeFilter.show()
rangeFilter.explain(True)
// Filter with range selection predicate
DataFrame rangeFilter = deptDFrame.Filter("deptId > 20").Select("deptName");
rangeFilter.Show();
rangeFilter.Explain(true);
Il risultato è il seguente:
rangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|----------|
|Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#534]
+- Filter (deptId#533 > 20)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [deptName#534]
+- Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(1) Project [deptName#534]
+- *(1) Filter (isnotnull(deptId#533) && (deptId#533 > 20))
+- *(1) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
Il terzo esempio è costituito da una query che unisce i record del reparto e dei dipendenti nell'ID reparto. L'istruzione SQL equivalente è illustrata di seguito:
SELECT employees.deptId, empName, departments.deptId, deptName
FROM employees, departments
WHERE employees.deptId = departments.deptId
L'output dell'esecuzione della cella seguente mostra i risultati della query, ovvero i nomi di 14 dipendenti e il nome del reparto in cui lavora ogni dipendente. Il piano di query è incluso anche nell'output. Si noti che i percorsi dei file per due operatori FileScan mostrano che Spark usava gli indici "empIndex" e "deptIndex1" per eseguire la query.
// Join
val eqJoin: DataFrame =
empDFrame.
join(deptDFrame, empDFrame("deptId") === deptDFrame("deptId")).
select(empDFrame("empName"), deptDFrame("deptName"))
eqJoin.show()
eqJoin.explain(true)
# Join
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
eqJoin.show()
eqJoin.explain(True)
// Join
DataFrame eqJoin =
empDFrame
.Join(deptDFrame, empDFrame.Col("deptId") == deptDFrame.Col("deptId"))
.Select(empDFrame.Col("empName"), deptDFrame.Col("deptName"));
eqJoin.Show();
eqJoin.Explain(true);
Il risultato è il seguente:
eqJoin: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Relation[empId#527,empName#528,deptId#529] parquet
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534] parquet
== Physical Plan ==
*(3) Project [empName#528, deptName#534]
+- *(3) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>, SelectedBucketsCount: 200 out of 200
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>, SelectedBucketsCount: 200 out of 200
Supporto per la semantica SQL
L'utilizzo dell'indice è trasparente se si usa l'API DataFrame o Spark SQL. Nell'esempio seguente viene illustrato lo stesso esempio di join precedente, in formato SQL, che mostra l'uso di indici, se applicabile.
empDFrame.createOrReplaceTempView("EMP")
deptDFrame.createOrReplaceTempView("DEPT")
val joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(true)
from pyspark.sql import SparkSession
emp_DF.createOrReplaceTempView("EMP")
dept_DF.createOrReplaceTempView("DEPT")
joinQuery = spark.sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId")
joinQuery.show()
joinQuery.explain(True)
empDFrame.CreateOrReplaceTempView("EMP");
deptDFrame.CreateOrReplaceTempView("DEPT");
var joinQuery = spark.Sql("SELECT EMP.empName, DEPT.deptName FROM EMP, DEPT WHERE EMP.deptId = DEPT.deptId");
joinQuery.Show();
joinQuery.Explain(true);
Il risultato è il seguente:
joinQuery: org.apache.spark.sql.DataFrame = [empName: string, deptName: string]
|empName| deptName|
|-------|----------|
| SMITH| Research|
| JONES| Research|
| FORD| Research|
| ADAMS| Research|
| SCOTT| Research|
| KING|Accounting|
| CLARK|Accounting|
| MILLER|Accounting|
| JAMES| Sales|
| BLAKE| Sales|
| MARTIN| Sales|
| ALLEN| Sales|
| WARD| Sales|
| TURNER| Sales|
== Parsed Logical Plan ==
'Project ['EMP.empName, 'DEPT.deptName]
+- 'Filter ('EMP.deptId = 'DEPT.deptId)
+- 'Join Inner
:- 'UnresolvedRelation `EMP`
+- 'UnresolvedRelation `DEPT`
== Analyzed Logical Plan ==
empName: string, deptName: string
Project [empName#528, deptName#534]
+- Filter (deptId#529 = deptId#533)
+- Join Inner
:- SubqueryAlias `emp`
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- SubqueryAlias `dept`
+- Relation[deptId#533,deptName#534,location#535] parquet
== Optimized Logical Plan ==
Project [empName#528, deptName#534]
+- Join Inner, (deptId#529 = deptId#533)
:- Project [empName#528, deptId#529]
: +- Filter isnotnull(deptId#529)
: +- Relation[empId#527,empName#528,deptId#529] parquet
+- Project [deptId#533, deptName#534]
+- Filter isnotnull(deptId#533)
+- Relation[deptId#533,deptName#534,location#535] parquet
== Physical Plan ==
*(5) Project [empName#528, deptName#534]
+- *(5) SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,empName:string>
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct<deptId:int,deptName:string>
Spiegare l'API
Gli indici sono ottimi, ma come si sa se vengono usati? Hyperspace consente agli utenti di confrontare il piano originale rispetto al piano dipendente dall'indice aggiornato prima di eseguire la query. È possibile scegliere tra HTML, testo non crittografato o modalità console per visualizzare l'output del comando.
La cella seguente mostra un esempio con HTML. La sezione evidenziata rappresenta la differenza tra i piani originali e aggiornati insieme agli indici usati.
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin)(displayHTML(_))
eqJoin = emp_DF.join(dept_DF, emp_DF.deptId == dept_DF.deptId).select(emp_DF.empName, dept_DF.deptName)
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
hyperspace.explain(eqJoin, True, displayHTML)
spark.Conf().Set("spark.hyperspace.explain.displayMode", "html");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.beginTag", "<b style=\"background:LightGreen\">");
spark.Conf().Set("spark.hyperspace.explain.displayMode.highlight.endTag", "</b>");
hyperspace.Explain(eqJoin, false, input => DisplayHTML(input));
Il risultato è il seguente:
Pianificare con indici
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [deptId#529,empName#528] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(2) Project [deptId#533, deptName#534]
+- *(2) Filter isnotnull(deptId#533)
+- *(2) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
Pianificare senza indici
Project [empName#528, deptName#534]
+- SortMergeJoin [deptId#529], [deptId#533], Inner
:- *(2) Sort [deptId#529 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(deptId#529, 200)
: +- *(1) Project [empName#528, deptId#529]
: +- *(1) Filter isnotnull(deptId#529)
: +- *(1) FileScan parquet [empName#528,deptId#529] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/employees.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
+- *(4) Sort [deptId#533 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(deptId#533, 200)
+- *(3) Project [deptId#533, deptName#534]
+- *(3) Filter isnotnull(deptId#533)
+- *(3) FileScan parquet [deptId#533,deptName#534] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/your-path/departments.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(deptId)], ReadSchema: struct
Indici usati
deptIndex1:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/deptIndex1/v__=0
empIndex:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/<container>/indexes/public/empIndex/v__=0
Aggiornare gli indici
Se i dati originali in cui è stato creato un indice cambiano, l'indice non acquisisce più lo stato più recente dei dati. È possibile aggiornare un indice non aggiornato usando il comando refreshIndex. Questo comando determina la ricompilazione completa dell'indice e lo aggiorna in base ai record di dati più recenti. Verrà illustrato come aggiornare in modo incrementale l'indice in altri notebook.
Le due celle seguenti illustrano un esempio per questo scenario:
- La prima cella aggiunge altri due reparti ai dati dei reparti originali. Legge e stampa un elenco di reparti per verificare che i nuovi reparti vengano aggiunti correttamente. L'output mostra sei reparti in totale: quattro vecchi e due nuovi. La chiamata di refreshIndex aggiorna "deptIndex1" in modo che l'indice acquisisca nuovi reparti.
- La seconda cella esegue l'esempio di query di selezione dell'intervallo. I risultati dovrebbero ora contenere quattro reparti: due sono quelli visualizzati prima di quando è stata eseguita la query precedente e due sono i nuovi reparti aggiunti.
Aggiornamento di indici specifici
val extraDepartments = Seq(
(50, "Inovation", "Seattle"),
(60, "Human Resources", "San Francisco"))
val extraDeptData: DataFrame = extraDepartments.toDF("deptId", "deptName", "location")
extraDeptData.write.mode("Append").parquet(dept_Location)
val deptDFrameUpdated: DataFrame = spark.read.parquet(dept_Location)
deptDFrameUpdated.show(10)
hyperspace.refreshIndex("deptIndex1")
extra_Departments = [(50, "Inovation", "Seattle"), (60, "Human Resources", "San Francisco")]
extra_departments_df = spark.createDataFrame(extra_Departments, dept_schema)
extra_departments_df.write.mode("Append").parquet(dept_Location)
dept_DFrame_Updated = spark.read.parquet(dept_Location)
dept_DFrame_Updated.show(10)
var extraDepartments = new List<GenericRow>()
{
new GenericRow(new object[] {50, "Inovation", "Seattle"}),
new GenericRow(new object[] {60, "Human Resources", "San Francisco"})
};
DataFrame extraDeptData = spark.CreateDataFrame(extraDepartments, departmentSchema);
extraDeptData.Write().Mode("Append").Parquet(dept_Location);
DataFrame deptDFrameUpdated = spark.Read().Parquet(dept_Location);
deptDFrameUpdated.Show(10);
hyperspace.RefreshIndex("deptIndex1");
Il risultato è il seguente:
extraDepartments: Seq[(Int, String, String)] = List((50,Inovation,Seattle), (60,Human Resources,San Francisco))
extraDeptData: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
deptDFrameUpdated: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string ... 1 more field]
|deptId| deptName| location|
|------|---------------|-------------|
| 60|Human Resources|San Francisco|
| 10| Accounting| New York|
| 50| Inovation| Seattle|
| 40| Operations| Boston|
| 20| Research| Dallas|
| 30| Sales| Chicago|
Selezione intervallo
val newRangeFilter: DataFrame = deptDFrameUpdated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(true)
newRangeFilter = dept_DFrame_Updated.filter("deptId > 20").select("deptName")
newRangeFilter.show()
newRangeFilter.explain(True)
DataFrame newRangeFilter = deptDFrameUpdated.Filter("deptId > 20").Select("deptName");
newRangeFilter.Show();
newRangeFilter.Explain(true);
Il risultato è il seguente:
newRangeFilter: org.apache.spark.sql.DataFrame = [deptName: string]
| DeptName|
|---------------|
|Human Resources|
| Inovation|
| Operations|
| Sales|
== Parsed Logical Plan ==
'Project [unresolvedalias('deptName, None)]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Analyzed Logical Plan ==
deptName: string
Project [deptName#675]
+- Filter (deptId#674 > 20)
+- Relation[deptId#674,deptName#675,location#676] parquet
== Optimized Logical Plan ==
Project [deptName#675]
+- Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- Relation[deptId#674,deptName#675,location#676] parquet
== Physical Plan ==
*(1) Project [deptName#675]
+- *(1) Filter (isnotnull(deptId#674) && (deptId#674 > 20))
+- *(1) FileScan parquet [deptId#674,deptName#675] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspaceon..., PartitionFilters: [], PushedFilters: [IsNotNull(deptId), GreaterThan(deptId,20)], ReadSchema: struct<deptId:int,deptName:string>
Analisi ibrida per set di dati modificabili
Spesso, se i dati di origine sottostanti avevano alcuni nuovi file aggiunti o esistenti eliminati, l'indice non verrà aggiornato e Hyperspace decide di non usarlo. Tuttavia, in alcuni casi, è sufficiente usare l'indice senza dover aggiornarlo ogni volta. Le cause possono essere molteplici:
- Non si desidera aggiornare continuamente l'indice, ma si vuole farlo periodicamente, perché si conoscono i carichi di lavoro al meglio.
- Sono stati aggiunti o rimossi solo alcuni file e non si desidera attendere il completamento di un altro processo di aggiornamento.
Per consentire di usare ancora un indice non aggiornato, Hyperspace introduce l'analisi ibrida, una nuova tecnica che consente agli utenti di usare indici obsoleti o non aggiornati (ad esempio, i dati di origine sottostanti avevano alcuni nuovi file aggiunti o file esistenti eliminati) senza aggiornare gli indici.
A tale scopo, quando si imposta la configurazione appropriata per abilitare l'analisi ibrida, Hyperspace modifica il piano di query per sfruttare le modifiche come indicato di seguito:
- I file accodati possono essere uniti ai dati dell'indice usando Union o BucketUnion (per il join). Se necessario, è anche possibile applicare i dati accodati casuali prima dell'unione.
- I file eliminati possono essere gestiti inserendo la condizione Filter-NOT-IN sulla colonna di derivazione dei dati dell'indice, in modo che le righe indicizzate dai file eliminati possano essere escluse in fase di query.
È possibile controllare la trasformazione del piano di query negli esempi seguenti.
Nota
Attualmente, l'analisi ibrida è supportata solo per i dati non partizionati.
Analisi ibrida per i file aggiunti - Dati non partizionati
I dati non partizionati vengono usati nell'esempio seguente. In questo esempio si prevede che l'indice join possa essere usato per la query e bucketUnion viene introdotto per i file accodati.
val testData = Seq(
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
).toDF("name", "qty", "date")
val testDataLocation = s"$dataPath/productTable"
testData.write.mode("overwrite").parquet(testDataLocation)
val testDF = spark.read.parquet(testDataLocation)
testdata = [
("orange", 3, "2020-10-01"),
("banana", 1, "2020-10-01"),
("carrot", 5, "2020-10-02"),
("beetroot", 12, "2020-10-02"),
("orange", 2, "2020-10-03"),
("banana", 11, "2020-10-03"),
("carrot", 3, "2020-10-03"),
("beetroot", 2, "2020-10-04"),
("cucumber", 7, "2020-10-05"),
("pepper", 20, "2020-10-06")
]
testdata_location = data_path + "/productTable"
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
testdata_schema = StructType([
StructField('name', StringType(), True),
StructField('qty', IntegerType(), True),
StructField('date', StringType(), True)])
test_df = spark.createDataFrame(testdata, testdata_schema)
test_df.write.mode("overwrite").parquet(testdata_location)
test_df = spark.read.parquet(testdata_location)
using Microsoft.Spark.Sql.Types;
var products = new List<GenericRow>() {
new GenericRow(new object[] {"orange", 3, "2020-10-01"}),
new GenericRow(new object[] {"banana", 1, "2020-10-01"}),
new GenericRow(new object[] {"carrot", 5, "2020-10-02"}),
new GenericRow(new object[] {"beetroot", 12, "2020-10-02"}),
new GenericRow(new object[] {"orange", 2, "2020-10-03"}),
new GenericRow(new object[] {"banana", 11, "2020-10-03"}),
new GenericRow(new object[] {"carrot", 3, "2020-10-03"}),
new GenericRow(new object[] {"beetroot", 2, "2020-10-04"}),
new GenericRow(new object[] {"cucumber", 7, "2020-10-05"}),
new GenericRow(new object[] {"pepper", 20, "2020-10-06"})
};
var productsSchema = new StructType(new List<StructField>()
{
new StructField("name", new StringType()),
new StructField("qty", new IntegerType()),
new StructField("date", new StringType())
});
DataFrame testData = spark.CreateDataFrame(products, productsSchema);
string testDataLocation = $"{dataPath}/productTable";
testData.Write().Mode("overwrite").Parquet(testDataLocation);
// CREATE INDEX
hyperspace.createIndex(testDF, IndexConfig("productIndex2", Seq("name"), Seq("date", "qty")))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val filter1 = testDF.filter("name = 'banana'")
val filter2 = testDF.filter("qty > 10")
val query = filter1.join(filter2, "name")
// Check Join index rule is applied properly.
hyperspace.explain(query)(displayHTML(_))
# CREATE INDEX
hyperspace.createIndex(test_df, IndexConfig("productIndex2", ["name"], ["date", "qty"]))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
filter1 = test_df.filter("name = 'banana'")
filter2 = test_df.filter("qty > 10")
query = filter1.join(filter2, "name")
# Check Join index rule is applied properly.
hyperspace.explain(query, True, displayHTML)
// CREATE INDEX
DataFrame testDF = spark.Read().Parquet(testDataLocation);
var productIndex2Config = new IndexConfig("productIndex", new string[] {"name"}, new string[] {"date", "qty"});
hyperspace.CreateIndex(testDF, productIndex2Config);
// Check Join index rule is applied properly.
DataFrame filter1 = testDF.Filter("name = 'banana'");
DataFrame filter2 = testDF.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,date#609,qty#608] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#631, qty#632, date#633]
+- *(2) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(2) FileScan parquet [name#631,date#633,qty#632] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#607, qty#608, date#609, qty#632, date#633]
+- SortMergeJoin [name#607], [name#631], Inner
:- *(2) Sort [name#607 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#607, 200), [id=#453]
: +- *(1) Project [name#607, qty#608, date#609]
: +- *(1) Filter (isnotnull(name#607) && (name#607 = banana))
: +- *(1) FileScan parquet [name#607,qty#608,date#609] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#631 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#631, 200), [id=#459]
+- *(3) Project [name#631, qty#632, date#633]
+- *(3) Filter (((isnotnull(qty#632) && (qty#632 > 10)) && isnotnull(name#631)) && (name#631 = banana))
+- *(3) FileScan parquet [name#631,qty#632,date#633] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
// Append new files.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
# Append new files.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
// Append new files.
var appendProducts = new List<GenericRow>()
{
new GenericRow(new object[] {"orange", 13, "2020-11-01"}),
new GenericRow(new object[] {"banana", 5, "2020-11-01"})
};
DataFrame appendData = spark.CreateDataFrame(appendProducts, productsSchema);
appendData.Write().Mode("Append").Parquet(testDataLocation);
L'analisi ibrida è disabilitata per impostazione predefinita. Pertanto, si noterà che, poiché sono stati aggiunti nuovi dati, Hyperspace deciderà di non usare l'indice.
Nell'output non verranno visualizzate le differenze nel piano (di conseguenza, nessuna evidenziazione).
// Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Hybrid Scan configs are false by default.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")
spark.conf.set("spark.hyperspace.index.hybridscan.delete.enabled", "false")
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Hybrid Scan configs are false by default.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "false");
spark.Conf().Set("spark.hyperspace.index.hybridscan.delete.enabled", "false");
DataFrame testDFWithAppend = spark.Read().Parquet(testDataLocation);
DataFrame filter1 = testDFWithAppend.Filter("name = 'banana'");
DataFrame filter2 = testDFWithAppend.Filter("qty > 10");
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#589]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#595]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#685, date#686]
+- SortMergeJoin [name#678], [name#684], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#536]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#684 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#684, 200), [id=#542]
+- *(3) Project [name#684, qty#685, date#686]
+- *(3) Filter (((isnotnull(qty#685) && (qty#685 > 10)) && (name#684 = banana)) && isnotnull(name#684))
+- *(3) FileScan parquet [name#684,qty#685,date#686] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
|banana| 1|2020-10-01| 11|2020-10-03|
+------+---+----------+---+----------
Abilitare l'analisi ibrida
Nel piano con gli indici, è possibile visualizzare il partizionamento hash di Exchange necessario solo per i file accodati in modo da poter comunque utilizzare i dati di indice "casuali" con i file aggiunti. BucketUnion viene usato per unire i file aggiunti "casuali" con i dati dell'indice.
// Enable Hybrid Scan config. "delete" config is not necessary since we only appended data.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
spark.enableHyperspace
// Need to redefine query to recalculate the query plan.
val query = filter1.join(filter2, "name")
hyperspace.explain(query)(displayHTML(_))
query.show
# Enable Hybrid Scan config. "delete" config is not necessary.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")
# Need to redefine query to recalculate the query plan.
query = filter1.join(filter2, "name")
hyperspace.explain(query, True, displayHTML)
query.show()
// Enable Hybrid Scan config. "delete" config is not necessary.
spark.Conf().Set("spark.hyperspace.index.hybridscan.enabled", "true");
spark.EnableHyperspace();
// Need to redefine query to recalculate the query plan.
DataFrame query = filter1.Join(filter2, filter1.Col("name") == filter2.Col("name"));
query.Show();
hyperspace.Explain(query, true, input => DisplayHTML(input));
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(3) Sort [name#678 ASC NULLS FIRST], false, 0
: +- BucketUnion 200 buckets, bucket columns: [name]
: :- *(1) Project [name#678, qty#679, date#680]
: : +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: : +- *(1) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
: +- Exchange hashpartitioning(name#678, 200), [id=#775]
: +- *(2) Project [name#678, qty#679, date#680]
: +- *(2) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(2) FileScan parquet [name#678,date#680,qty#679] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(6) Sort [name#731 ASC NULLS FIRST], false, 0
+- BucketUnion 200 buckets, bucket columns: [name]
:- *(4) Project [name#731, qty#732, date#733]
: +- *(4) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
: +- *(4) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- Exchange hashpartitioning(name#731, 200), [id=#783]
+- *(5) Project [name#731, qty#732, date#733]
+- *(5) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(5) FileScan parquet [name#731,date#733,qty#732] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Plan without indexes:
=============================================================
Project [name#678, qty#679, date#680, qty#732, date#733]
+- SortMergeJoin [name#678], [name#731], Inner
:- *(2) Sort [name#678 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#678, 200), [id=#701]
: +- *(1) Project [name#678, qty#679, date#680]
: +- *(1) Filter (isnotnull(name#678) && (name#678 = banana))
: +- *(1) FileScan parquet [name#678,qty#679,date#680] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#731 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#731, 200), [id=#707]
+- *(3) Project [name#731, qty#732, date#733]
+- *(3) Filter (((isnotnull(qty#732) && (qty#732 > 10)) && isnotnull(name#731)) && (name#731 = banana))
+- *(3) FileScan parquet [name#731,qty#732,date#733] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=0
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
Aggiornamento incrementale degli indici
Quando si è pronti per aggiornare gli indici, ma non si desidera ricompilare l'intero indice, Hyperspace supporta l'aggiornamento degli indici in modo incrementale usando l'API hs.refreshIndex("name", "incremental")
. In questo modo si elimina la necessità di una ricompilazione completa dell'indice da zero, utilizzando i file di indice creati in precedenza e aggiornando gli indici solo sui dati appena aggiunti.
Naturalmente, è consigliabile assicurarsi di usare periodicamente l'API complementare optimizeIndex
(illustrata di seguito) per assicurarsi che non vengano visualizzate le regressioni delle prestazioni. È consigliabile chiamare Ottimizza almeno una volta per ogni 10 volte che si chiama refreshIndex(..., "incremental")
, presupponendo che i dati aggiunti/rimossi siano < 10% del set di dati originale. Ad esempio, se il set di dati originale è 100 GB e sono stati aggiunti/rimossi dati in incrementi/decrementi di 1 GB, è possibile chiamare refreshIndex
10 volte prima di chiamare optimizeIndex
. Si noti che questo esempio viene fornito solo a scopo illustrativo e che deve quindi essere adattato ai carichi di lavoro.
Nell'esempio seguente si noti l'aggiunta di un nodo Ordina nel piano di query quando vengono usati gli indici. Ciò è dovuto al fatto che gli indici parziali vengono creati nei file di dati aggiunti, causando l'introduzione di Spark a Sort
. Si noti anche che Shuffle
, ovvero Exchange è ancora eliminato dal piano, per offrire un’accelerazione appropriata.
def query(): DataFrame = {
val testDFWithAppend = spark.read.parquet(testDataLocation)
val filter1 = testDFWithAppend.filter("name = 'banana'")
val filter2 = testDFWithAppend.filter("qty > 10")
filter1.join(filter2, "name")
}
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query())(displayHTML(_))
query().show
def query():
test_df_with_append = spark.read.parquet(testdata_location)
filter1 = test_df_with_append.filter("name = 'banana'")
filter2 = test_df_with_append.filter("qty > 10")
return filter1.join(filter2, "name")
hyperspace.refreshIndex("productIndex2", "incremental")
hyperspace.explain(query(), True, displayHTML)
query().show()
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(1) Sort [name#820 ASC NULLS FIRST], false, 0
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,date#822,qty#821] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Sort [name#826 ASC NULLS FIRST], false, 0
+- *(2) Project [name#826, qty#827, date#828]
+- *(2) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(2) FileScan parquet [name#826,date#828,qty#827] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#820, qty#821, date#822, qty#827, date#828]
+- SortMergeJoin [name#820], [name#826], Inner
:- *(2) Sort [name#820 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#820, 200), [id=#927]
: +- *(1) Project [name#820, qty#821, date#822]
: +- *(1) Filter (isnotnull(name#820) && (name#820 = banana))
: +- *(1) FileScan parquet [name#820,qty#821,date#822] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#826 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#826, 200), [id=#933]
+- *(3) Project [name#826, qty#827, date#828]
+- *(3) Filter (((isnotnull(qty#827) && (qty#827 > 10)) && (name#826 = banana)) && isnotnull(name#826))
+- *(3) FileScan parquet [name#826,qty#827,date#828] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), EqualTo(name,banana), IsNotNull(name)], ReadSchema: struct
+------+---+----------+---+----------+
| name|qty| date|qty| date|
+------+---+----------+---+----------+
|banana| 1|2020-10-01| 11|2020-10-03|
|banana| 11|2020-10-03| 11|2020-10-03|
|banana| 5|2020-11-01| 11|2020-10-03|
+------+---+----------+---+----------+
Ottimizzare il layout degli indici
Dopo aver chiamato l'aggiornamento incrementale più volte sui dati appena accodati (ad esempio, se l'utente scrive nei dati in batch di piccole dimensioni o nel caso di scenari di streaming), il numero di file di indice tende a diventare elevato influendo pertanto sulle prestazioni dell'indice (numero elevato di file di piccole dimensioni). Hyperspace fornisce l'API hyperspace.optimizeIndex("indexName")
per ottimizzare il layout dell'indice e ridurre il problema relativo ai file di grandi dimensioni.
Nel piano seguente, si noti che Hyperspace ha rimosso il nodo Ordina aggiuntivo nel piano di query. Ottimizza consente di evitare l'ordinamento per qualsiasi bucket di indice che contiene un solo file. Tuttavia, questo valore sarà true solo se TUTTI i bucket di indice hanno al massimo 1 file per bucket, dopo optimizeIndex
.
// Append some more data and call refresh again.
val appendData = Seq(
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")).toDF("name", "qty", "date")
appendData.write.mode("append").parquet(testDataLocation)
hyperspace.refreshIndex("productIndex2", "incremental")
# Append some more data and call refresh again.
append_data = [
("orange", 13, "2020-11-01"),
("banana", 5, "2020-11-01")
]
append_df = spark.createDataFrame(append_data, testdata_schema)
append_df.write.mode("append").parquet(testdata_location)
hyperspace.refreshIndex("productIndex2", "incremental"
// Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query())(displayHTML(_))
# Call optimize. Ensure that Sort is removed after optimization (This is possible here because after optimize, in this case, every bucket contains only 1 file.).
hyperspace.optimizeIndex("productIndex2")
hyperspace.explain(query(), True, displayHTML)
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,date#956,qty#955] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#960, qty#961, date#962]
+- *(2) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(2) FileScan parquet [name#960,date#962,qty#961] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#954, qty#955, date#956, qty#961, date#962]
+- SortMergeJoin [name#954], [name#960], Inner
:- *(2) Sort [name#954 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#954, 200), [id=#1070]
: +- *(1) Project [name#954, qty#955, date#956]
: +- *(1) Filter (isnotnull(name#954) && (name#954 = banana))
: +- *(1) FileScan parquet [name#954,qty#955,date#956] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#960 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#960, 200), [id=#1076]
+- *(3) Project [name#960, qty#961, date#962]
+- *(3) Filter (((isnotnull(qty#961) && (qty#961 > 10)) && isnotnull(name#960)) && (name#960 = banana))
+- *(3) FileScan parquet [name#960,qty#961,date#962] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=3
Ottimizzare le modalità
La modalità predefinita per l'ottimizzazione è la modalità "rapida" in cui i file più piccoli di una soglia predefinita vengono scelti per l'ottimizzazione. Per ottimizzare l'effetto dell'ottimizzazione, Hyperspace consente un'altra modalità di ottimizzazione "completa", come illustrato di seguito. Questa modalità seleziona tutti i file di indice per l'ottimizzazione indipendentemente dalle dimensioni del file e crea il layout migliore possibile dell'indice. Tale modalità è inoltre più lenta rispetto a quella di ottimizzazione predefinita perché vengono elaborati più dati.
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query())(displayHTML(_))
hyperspace.optimizeIndex("productIndex2", "full")
hyperspace.explain(query(), True, displayHTML)
Risultato:
=============================================================
Plan with indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,date#1002,qty#1001] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
+- *(2) Project [name#1006, qty#1007, date#1008]
+- *(2) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(2) FileScan parquet [name#1006,date#1008,qty#1007] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/p..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct, SelectedBucketsCount: 1 out of 200
=============================================================
Plan without indexes:
=============================================================
Project [name#1000, qty#1001, date#1002, qty#1007, date#1008]
+- SortMergeJoin [name#1000], [name#1006], Inner
:- *(2) Sort [name#1000 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#1000, 200), [id=#1160]
: +- *(1) Project [name#1000, qty#1001, date#1002]
: +- *(1) Filter (isnotnull(name#1000) && (name#1000 = banana))
: +- *(1) FileScan parquet [name#1000,qty#1001,date#1002] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
+- *(4) Sort [name#1006 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#1006, 200), [id=#1166]
+- *(3) Project [name#1006, qty#1007, date#1008]
+- *(3) Filter (((isnotnull(qty#1007) && (qty#1007 > 10)) && isnotnull(name#1006)) && (name#1006 = banana))
+- *(3) FileScan parquet [name#1006,qty#1007,date#1008] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/data-777519/prod..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct
=============================================================
Indexes used:
=============================================================
productIndex2:abfss://datasets@hyperspacebenchmark.dfs.core.windows.net/hyperspace/indexes-777519/productIndex2/v__=4